Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add resource aws_sqs_queue_redrive_policy #26733

Merged
merged 15 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2052,8 +2052,9 @@ func New(_ context.Context) (*schema.Provider, error) {
"aws_sns_topic_policy": sns.ResourceTopicPolicy(),
"aws_sns_topic_subscription": sns.ResourceTopicSubscription(),

"aws_sqs_queue": sqs.ResourceQueue(),
"aws_sqs_queue_policy": sqs.ResourceQueuePolicy(),
"aws_sqs_queue": sqs.ResourceQueue(),
"aws_sqs_queue_policy": sqs.ResourceQueuePolicy(),
"aws_sqs_queue_redrive_policy": sqs.ResourceQueueRedrivePolicy(),

"aws_ssm_activation": ssm.ResourceActivation(),
"aws_ssm_association": ssm.ResourceAssociation(),
Expand Down
2 changes: 1 addition & 1 deletion internal/service/acm/sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func sweepCertificates(region string) error {
for _, iub := range output.Certificate.InUseBy {
m[aws.StringValue(iub)[:77]] = ""
}
for k, _ := range m {
for k := range m {
log.Printf("[INFO] %s...", k)
}
continue
Expand Down
147 changes: 147 additions & 0 deletions internal/service/sqs/attribute_funcs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package sqs

import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/structure"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
"log"
)

func generateQueueAttributeUpsertFunc(attributeName string) func(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
return func(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).SQSConn

attrValue, err := structure.NormalizeJsonString(d.Get(getSchemaKey(attributeName)).(string))

if err != nil {
return diag.FromErr(fmt.Errorf("%s (%s) is invalid JSON: %w", attributeName, d.Get(getSchemaKey(attributeName)).(string), err))
}

var attributes map[string]string

switch attributeName {
case sqs.QueueAttributeNamePolicy:
attributes = map[string]string{
sqs.QueueAttributeNamePolicy: attrValue,
}
case sqs.QueueAttributeNameRedrivePolicy:
attributes = map[string]string{
sqs.QueueAttributeNameRedrivePolicy: attrValue,
}
default:
return diag.FromErr(fmt.Errorf("%s is an invalid SQS Queue attribute name", attributeName))
}

url := d.Get("queue_url").(string)
input := &sqs.SetQueueAttributesInput{
Attributes: aws.StringMap(attributes),
QueueUrl: aws.String(url),
}

log.Printf("[DEBUG] Setting SQS Queue Attribute '%s': %s", attributeName, input)
_, err = conn.SetQueueAttributesWithContext(ctx, input)

if err != nil {
return diag.FromErr(fmt.Errorf("error setting SQS Queue Attribute '%s' (%s): %w", attributeName, url, err))
}

d.SetId(url)

err = waitQueueAttributesPropagatedWithContext(ctx, conn, d.Id(), attributes)

if err != nil {
return diag.FromErr(fmt.Errorf("error waiting for SQS Queue Attribute '%s' (%s) to be set: %w", attributeName, d.Id(), err))
}

return generateQueueAttributeReadFunc(attributeName)(ctx, d, meta)
}
}
func generateQueueAttributeReadFunc(attributeName string) func(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
return func(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).SQSConn

outputRaw, err := tfresource.RetryWhenNotFound(queueAttributeReadTimeout, func() (interface{}, error) {
return FindQueueAttributeByURL(ctx, conn, d.Id(), attributeName)
})

if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] SQS Queue Policy (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}

if err != nil {
return diag.FromErr(fmt.Errorf("error reading SQS Queue Attribute '%s' (%s): %w", attributeName, d.Id(), err))
}

var attributeToSet string
switch attributeName {
case sqs.QueueAttributeNamePolicy:
attributeToSet, err = verify.PolicyToSet(d.Get(getSchemaKey(attributeName)).(string), outputRaw.(string))
if err != nil {
return diag.FromErr(err)
}
case sqs.QueueAttributeNameRedrivePolicy:
if BytesEqual([]byte(d.Get(getSchemaKey(attributeName)).(string)), []byte(outputRaw.(string))) {
attributeToSet = d.Get(getSchemaKey(attributeName)).(string)
} else {
attributeToSet = outputRaw.(string)
}
default:
return diag.FromErr(fmt.Errorf("%s is an invalid SQS Queue attribute name", attributeName))
}

d.Set(getSchemaKey(attributeName), attributeToSet)

d.Set("queue_url", d.Id())

return nil
}
}

func generateQueueAttributeDeleteFunc(attributeName string) func(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
return func(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).SQSConn

log.Printf("[DEBUG] Deleting SQS Queue Attribute '%s': %s", attributeName, d.Id())

var emptyAttributes map[string]string
switch attributeName {
case sqs.QueueAttributeNamePolicy:
emptyAttributes = queueEmptyPolicyAttributes
case sqs.QueueAttributeNameRedrivePolicy:
emptyAttributes = queueEmptyRedrivePolicyAttributes
default:
return diag.FromErr(fmt.Errorf("%s is an invalid SQS Queue attribute name", attributeName))
}

_, err := conn.SetQueueAttributes(&sqs.SetQueueAttributesInput{
Attributes: aws.StringMap(emptyAttributes),
QueueUrl: aws.String(d.Id()),
})

if tfawserr.ErrCodeEquals(err, sqs.ErrCodeQueueDoesNotExist) {
return nil
}

if err != nil {
return diag.FromErr(fmt.Errorf("error deleting SQS Queue Attribute '%s' (%s): %w", attributeName, d.Id(), err))
}

err = waitQueueAttributesPropagatedWithContext(ctx, conn, d.Id(), emptyAttributes)

if err != nil {
return diag.FromErr(fmt.Errorf("error waiting for SQS Queue Attribute '%s' (%s) to delete: %w", attributeName, d.Id(), err))
}

return nil
}
}
9 changes: 5 additions & 4 deletions internal/service/sqs/find.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sqs

import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
Expand Down Expand Up @@ -36,13 +37,13 @@ func FindQueueAttributesByURL(conn *sqs.SQS, url string) (map[string]string, err
return aws.StringValueMap(output.Attributes), nil
}

func FindQueuePolicyByURL(conn *sqs.SQS, url string) (string, error) {
func FindQueueAttributeByURL(ctx context.Context, conn *sqs.SQS, url string, attributeName string) (string, error) {
input := &sqs.GetQueueAttributesInput{
AttributeNames: aws.StringSlice([]string{sqs.QueueAttributeNamePolicy}),
AttributeNames: aws.StringSlice([]string{attributeName}),
QueueUrl: aws.String(url),
}

output, err := conn.GetQueueAttributes(input)
output, err := conn.GetQueueAttributesWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, sqs.ErrCodeQueueDoesNotExist) {
return "", &resource.NotFoundError{
Expand All @@ -62,7 +63,7 @@ func FindQueuePolicyByURL(conn *sqs.SQS, url string) (string, error) {
}
}

v, ok := output.Attributes[sqs.QueueAttributeNamePolicy]
v, ok := output.Attributes[attributeName]

if !ok || aws.StringValue(v) == "" {
return "", &resource.NotFoundError{
Expand Down
1 change: 1 addition & 0 deletions internal/service/sqs/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ var (
"redrive_policy": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ValidateFunc: validation.StringIsJSON,
StateFunc: func(v interface{}) string {
json, _ := structure.NormalizeJsonString(v)
Expand Down
111 changes: 5 additions & 106 deletions internal/service/sqs/queue_policy.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
package sqs

import (
"fmt"
"log"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/structure"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

Expand All @@ -24,16 +17,15 @@ var (
func ResourceQueuePolicy() *schema.Resource {
//lintignore:R011
return &schema.Resource{
Create: resourceQueuePolicyUpsert,
Read: resourceQueuePolicyRead,
Update: resourceQueuePolicyUpsert,
Delete: resourceQueuePolicyDelete,
CreateContext: generateQueueAttributeUpsertFunc(sqs.QueueAttributeNamePolicy),
ReadContext: generateQueueAttributeReadFunc(sqs.QueueAttributeNamePolicy),
UpdateContext: generateQueueAttributeUpsertFunc(sqs.QueueAttributeNamePolicy),
DeleteContext: generateQueueAttributeDeleteFunc(sqs.QueueAttributeNamePolicy),
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
StateContext: schema.ImportStatePassthroughContext,
},
MigrateState: QueuePolicyMigrateState,
SchemaVersion: 1,

Schema: map[string]*schema.Schema{
"policy": {
Type: schema.TypeString,
Expand All @@ -54,96 +46,3 @@ func ResourceQueuePolicy() *schema.Resource {
},
}
}

func resourceQueuePolicyUpsert(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).SQSConn

policy, err := structure.NormalizeJsonString(d.Get("policy").(string))

if err != nil {
return fmt.Errorf("policy (%s) is invalid JSON: %w", d.Get("policy").(string), err)
}

policyAttributes := map[string]string{
sqs.QueueAttributeNamePolicy: policy,
}

url := d.Get("queue_url").(string)
input := &sqs.SetQueueAttributesInput{
Attributes: aws.StringMap(policyAttributes),
QueueUrl: aws.String(url),
}

log.Printf("[DEBUG] Setting SQS Queue Policy: %s", input)
_, err = conn.SetQueueAttributes(input)

if err != nil {
return fmt.Errorf("error setting SQS Queue Policy (%s): %w", url, err)
}

d.SetId(url)

err = waitQueueAttributesPropagated(conn, d.Id(), policyAttributes)

if err != nil {
return fmt.Errorf("error waiting for SQS Queue Policy (%s) to be set: %w", d.Id(), err)
}

return resourceQueuePolicyRead(d, meta)
}

func resourceQueuePolicyRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).SQSConn

outputRaw, err := tfresource.RetryWhenNotFound(queuePolicyReadTimeout, func() (interface{}, error) {
return FindQueuePolicyByURL(conn, d.Id())
})

if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] SQS Queue Policy (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}

if err != nil {
return fmt.Errorf("error reading SQS Queue Policy (%s): %w", d.Id(), err)
}

policyToSet, err := verify.PolicyToSet(d.Get("policy").(string), outputRaw.(string))

if err != nil {
return err
}

d.Set("policy", policyToSet)

d.Set("queue_url", d.Id())

return nil
}

func resourceQueuePolicyDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*conns.AWSClient).SQSConn

log.Printf("[DEBUG] Deleting SQS Queue Policy: %s", d.Id())
_, err := conn.SetQueueAttributes(&sqs.SetQueueAttributesInput{
Attributes: aws.StringMap(queueEmptyPolicyAttributes),
QueueUrl: aws.String(d.Id()),
})

if tfawserr.ErrCodeEquals(err, sqs.ErrCodeQueueDoesNotExist) {
return nil
}

if err != nil {
return fmt.Errorf("error deleting SQS Queue Policy (%s): %w", d.Id(), err)
}

err = waitQueueAttributesPropagated(conn, d.Id(), queueEmptyPolicyAttributes)

if err != nil {
return fmt.Errorf("error waiting for SQS Queue Policy (%s) to delete: %w", d.Id(), err)
}

return nil
}
43 changes: 43 additions & 0 deletions internal/service/sqs/queue_redrive_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package sqs

import (
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/structure"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
)

var (
queueEmptyRedrivePolicyAttributes = map[string]string{
sqs.QueueAttributeNameRedrivePolicy: "",
}
)

func ResourceQueueRedrivePolicy() *schema.Resource {
return &schema.Resource{
Schema: map[string]*schema.Schema{
"redrive_policy": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validation.StringIsJSON,
StateFunc: func(v interface{}) string {
json, _ := structure.NormalizeJsonString(v)
return json
},
},
"queue_url": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
},
SchemaVersion: 0,
Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},
CreateContext: generateQueueAttributeUpsertFunc(sqs.QueueAttributeNameRedrivePolicy),
ReadContext: generateQueueAttributeReadFunc(sqs.QueueAttributeNameRedrivePolicy),
UpdateContext: generateQueueAttributeUpsertFunc(sqs.QueueAttributeNameRedrivePolicy),
DeleteContext: generateQueueAttributeDeleteFunc(sqs.QueueAttributeNameRedrivePolicy),
}
}
Loading