Skip to content

Commit

Permalink
Merge pull request #35630 from szesch/b-dynamodb_describe_table_per_r…
Browse files Browse the repository at this point in the history
…eplica

Fix dynamodb make a DescribeTable call per replica
  • Loading branch information
ewbankkit authored Feb 6, 2024
2 parents 85ed56a + 92c8257 commit 21d5ee4
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 267 deletions.
3 changes: 3 additions & 0 deletions .changelog/35630.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
resource/aws_dynamodb_table: Ensure that `replica`s are always set on Read
```
8 changes: 8 additions & 0 deletions internal/service/dynamodb/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package dynamodb

const (
errCodeValidationException = "ValidationException"
)
5 changes: 4 additions & 1 deletion internal/service/dynamodb/exports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ package dynamodb

// Exports for use in tests only.
var (
ListTags = listTags
ResourceKinesisStreamingDestination = resourceKinesisStreamingDestination

FindKinesisDataStreamDestinationByTwoPartKey = findKinesisDataStreamDestinationByTwoPartKey
ListTags = listTags
)
31 changes: 0 additions & 31 deletions internal/service/dynamodb/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,6 @@ import (
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
)

func FindKinesisDataStreamDestination(ctx context.Context, conn *dynamodb.DynamoDB, streamArn, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
input := &dynamodb.DescribeKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
}

output, err := conn.DescribeKinesisStreamingDestinationWithContext(ctx, input)

if err != nil {
return nil, err
}

if output == nil {
return nil, nil
}

var result *dynamodb.KinesisDataStreamDestination

for _, destination := range output.KinesisDataStreamDestinations {
if destination == nil {
continue
}

if aws.StringValue(destination.StreamArn) == streamArn {
result = destination
break
}
}

return result, nil
}

func FindTableByName(ctx context.Context, conn *dynamodb.DynamoDB, name string) (*dynamodb.TableDescription, error) {
input := &dynamodb.DescribeTableInput{
TableName: aws.String(name),
Expand Down
201 changes: 155 additions & 46 deletions internal/service/dynamodb/kinesis_streaming_destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,31 @@ package dynamodb

import (
"context"
"fmt"
"errors"
"log"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/errs"
"github.com/hashicorp/terraform-provider-aws/internal/errs/sdkdiag"
"github.com/hashicorp/terraform-provider-aws/internal/flex"
tfslices "github.com/hashicorp/terraform-provider-aws/internal/slices"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

// @SDKResource("aws_dynamodb_kinesis_streaming_destination")
func ResourceKinesisStreamingDestination() *schema.Resource {
const (
kinesisStreamingDestinationResourceIDPartCount = 2
)

// @SDKResource("aws_dynamodb_kinesis_streaming_destination", name="Kinesis Streaming Destination")
func resourceKinesisStreamingDestination() *schema.Resource {
return &schema.Resource{
CreateWithoutTimeout: resourceKinesisStreamingDestinationCreate,
ReadWithoutTimeout: resourceKinesisStreamingDestinationRead,
Expand Down Expand Up @@ -51,63 +60,49 @@ func resourceKinesisStreamingDestinationCreate(ctx context.Context, d *schema.Re

conn := meta.(*conns.AWSClient).DynamoDBConn(ctx)

streamArn := d.Get("stream_arn").(string)
streamARN := d.Get("stream_arn").(string)
tableName := d.Get("table_name").(string)

id := errs.Must(flex.FlattenResourceId([]string{tableName, streamARN}, kinesisStreamingDestinationResourceIDPartCount, false))
input := &dynamodb.EnableKinesisStreamingDestinationInput{
StreamArn: aws.String(streamArn),
StreamArn: aws.String(streamARN),
TableName: aws.String(tableName),
}

output, err := conn.EnableKinesisStreamingDestinationWithContext(ctx, input)
_, err := conn.EnableKinesisStreamingDestinationWithContext(ctx, input)

if err != nil {
return sdkdiag.AppendErrorf(diags, "enabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): %s", streamArn, tableName, err)
return sdkdiag.AppendErrorf(diags, "enabling DynamoDB Kinesis Streaming Destination (%s): %s", id, err)
}

if output == nil {
return sdkdiag.AppendErrorf(diags, "enabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): empty output", streamArn, tableName)
}
d.SetId(id)

if err := waitKinesisStreamingDestinationActive(ctx, conn, streamArn, tableName); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DynamoDB Kinesis streaming destination (stream: %s, table: %s) to be active: %s", streamArn, tableName, err)
if _, err := waitKinesisStreamingDestinationActive(ctx, conn, streamARN, tableName); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DynamoDB Kinesis Streaming Destination (%s) create: %s", d.Id(), err)
}

d.SetId(fmt.Sprintf("%s,%s", aws.StringValue(output.TableName), aws.StringValue(output.StreamArn)))

return append(diags, resourceKinesisStreamingDestinationRead(ctx, d, meta)...)
}

func resourceKinesisStreamingDestinationRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics

conn := meta.(*conns.AWSClient).DynamoDBConn(ctx)

tableName, streamArn, err := KinesisStreamingDestinationParseID(d.Id())

parts, err := flex.ExpandResourceId(d.Id(), kinesisStreamingDestinationResourceIDPartCount, false)
if err != nil {
return sdkdiag.AppendFromErr(diags, err)
}

output, err := FindKinesisDataStreamDestination(ctx, conn, streamArn, tableName)
tableName, streamARN := parts[0], parts[1]
output, err := findKinesisDataStreamDestinationByTwoPartKey(ctx, conn, streamARN, tableName)

if !d.IsNewResource() && tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
log.Printf("[WARN] DynamoDB Kinesis Streaming Destination (stream: %s, table: %s) not found, removing from state", streamArn, tableName)
if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] DynamoDB Kinesis Streaming Destination (%s) not found, removing from state", d.Id())
d.SetId("")
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "retrieving DynamoDB Kinesis streaming destination (stream: %s, table: %s): %s", streamArn, tableName, err)
}

if output == nil || aws.StringValue(output.DestinationStatus) == dynamodb.DestinationStatusDisabled {
if d.IsNewResource() {
return sdkdiag.AppendErrorf(diags, "retrieving DynamoDB Kinesis streaming destination (stream: %s, table: %s): empty output after creation", streamArn, tableName)
}
log.Printf("[WARN] DynamoDB Kinesis Streaming Destination (stream: %s, table: %s) not found, removing from state", streamArn, tableName)
d.SetId("")
return diags
return sdkdiag.AppendErrorf(diags, "reading DynamoDB Kinesis Streaming Destination (%s): %s", d.Id(), err)
}

d.Set("stream_arn", output.StreamArn)
Expand All @@ -121,36 +116,150 @@ func resourceKinesisStreamingDestinationDelete(ctx context.Context, d *schema.Re

conn := meta.(*conns.AWSClient).DynamoDBConn(ctx)

tableName, streamArn, err := KinesisStreamingDestinationParseID(d.Id())

parts, err := flex.ExpandResourceId(d.Id(), kinesisStreamingDestinationResourceIDPartCount, false)
if err != nil {
return sdkdiag.AppendFromErr(diags, err)
}

input := &dynamodb.DisableKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
StreamArn: aws.String(streamArn),
tableName, streamARN := parts[0], parts[1]
_, err = findKinesisDataStreamDestinationByTwoPartKey(ctx, conn, streamARN, tableName)

if tfresource.NotFound(err) {
return diags
}

_, err = conn.DisableKinesisStreamingDestinationWithContext(ctx, input)
log.Printf("[DEBUG] Deleting DynamoDB Kinesis Streaming Destination: %s", d.Id())
_, err = conn.DisableKinesisStreamingDestinationWithContext(ctx, &dynamodb.DisableKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
StreamArn: aws.String(streamARN),
})

if tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "disabling DynamoDB Kinesis streaming destination (stream: %s, table: %s): %s", streamArn, tableName, err)
return sdkdiag.AppendErrorf(diags, "disabling DynamoDB Kinesis Streaming Destination (%s): %s", d.Id(), err)
}

if err := waitKinesisStreamingDestinationDisabled(ctx, conn, streamArn, tableName); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DynamoDB Kinesis streaming destination (stream: %s, table: %s) to be disabled: %s", streamArn, tableName, err)
if _, err := waitKinesisStreamingDestinationDisabled(ctx, conn, streamARN, tableName); err != nil {
return sdkdiag.AppendErrorf(diags, "waiting for DynamoDB Kinesis Streaming Destination (%s) delete: %s", d.Id(), err)
}

return diags
}

func KinesisStreamingDestinationParseID(id string) (string, string, error) {
parts := strings.SplitN(id, ",", 2)
func kinesisDataStreamDestinationForStream(arn string) tfslices.Predicate[*dynamodb.KinesisDataStreamDestination] {
return func(v *dynamodb.KinesisDataStreamDestination) bool {
return aws.StringValue(v.StreamArn) == arn
}
}

func findKinesisDataStreamDestinationByTwoPartKey(ctx context.Context, conn *dynamodb.DynamoDB, streamARN, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
input := &dynamodb.DescribeKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
}
output, err := findKinesisDataStreamDestination(ctx, conn, input, kinesisDataStreamDestinationForStream(streamARN))

if err != nil {
return nil, err
}

if aws.StringValue(output.DestinationStatus) == dynamodb.DestinationStatusDisabled {
return nil, &retry.NotFoundError{}
}

return output, nil
}

func findKinesisDataStreamDestination(ctx context.Context, conn *dynamodb.DynamoDB, input *dynamodb.DescribeKinesisStreamingDestinationInput, filter tfslices.Predicate[*dynamodb.KinesisDataStreamDestination]) (*dynamodb.KinesisDataStreamDestination, error) {
output, err := findKinesisDataStreamDestinations(ctx, conn, input, filter)

if err != nil {
return nil, err
}

return tfresource.AssertSinglePtrResult(output)
}

func findKinesisDataStreamDestinations(ctx context.Context, conn *dynamodb.DynamoDB, input *dynamodb.DescribeKinesisStreamingDestinationInput, filter tfslices.Predicate[*dynamodb.KinesisDataStreamDestination]) ([]*dynamodb.KinesisDataStreamDestination, error) {
output, err := conn.DescribeKinesisStreamingDestinationWithContext(ctx, input)

if tfawserr.ErrCodeEquals(err, dynamodb.ErrCodeResourceNotFoundException) {
return nil, &retry.NotFoundError{
LastError: err,
LastRequest: input,
}
}

if err != nil {
return nil, err
}

if output == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return tfslices.Filter(output.KinesisDataStreamDestinations, filter), nil
}

func statusKinesisStreamingDestination(ctx context.Context, conn *dynamodb.DynamoDB, streamARN, tableName string) retry.StateRefreshFunc {
return func() (interface{}, string, error) {
input := &dynamodb.DescribeKinesisStreamingDestinationInput{
TableName: aws.String(tableName),
}
output, err := findKinesisDataStreamDestination(ctx, conn, input, kinesisDataStreamDestinationForStream(streamARN))

if tfresource.NotFound(err) {
return nil, "", nil
}

if err != nil {
return nil, "", err
}

return output, aws.StringValue(output.DestinationStatus), nil
}
}

func waitKinesisStreamingDestinationActive(ctx context.Context, conn *dynamodb.DynamoDB, streamARN, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
const (
timeout = 5 * time.Minute
)
stateConf := &retry.StateChangeConf{
Pending: []string{dynamodb.DestinationStatusDisabled, dynamodb.DestinationStatusEnabling},
Target: []string{dynamodb.DestinationStatusActive},
Timeout: timeout,
Refresh: statusKinesisStreamingDestination(ctx, conn, streamARN, tableName),
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if output, ok := outputRaw.(*dynamodb.KinesisDataStreamDestination); ok {
tfresource.SetLastError(err, errors.New(aws.StringValue(output.DestinationStatusDescription)))
return output, err
}

return nil, err
}

func waitKinesisStreamingDestinationDisabled(ctx context.Context, conn *dynamodb.DynamoDB, streamARN, tableName string) (*dynamodb.KinesisDataStreamDestination, error) {
const (
timeout = 5 * time.Minute
)
stateConf := &retry.StateChangeConf{
Pending: []string{dynamodb.DestinationStatusActive, dynamodb.DestinationStatusDisabling},
Target: []string{dynamodb.DestinationStatusDisabled},
Timeout: timeout,
Refresh: statusKinesisStreamingDestination(ctx, conn, streamARN, tableName),
}

outputRaw, err := stateConf.WaitForStateContext(ctx)

if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", "", fmt.Errorf("unexpected format of ID (%s), expected TABLE_NAME,STREAM_ARN", id)
if output, ok := outputRaw.(*dynamodb.KinesisDataStreamDestination); ok {
tfresource.SetLastError(err, errors.New(aws.StringValue(output.DestinationStatusDescription)))
return output, err
}

return parts[0], parts[1], nil
return nil, err
}
Loading

0 comments on commit 21d5ee4

Please sign in to comment.