Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource: aws_rds_cluster_activity_stream #14243

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions aws/internal/service/rds/waiter/status.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package waiter

import (
"fmt"
"log"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/rds/finder"
)
Expand Down Expand Up @@ -58,3 +62,35 @@ func DBProxyEndpointStatus(conn *rds.RDS, id string) resource.StateRefreshFunc {
return output, aws.StringValue(output.Status), nil
}
}

// ActivityStreamStatus fetches the RDS Cluster Activity Stream Status
func ActivityStreamStatus(conn *rds.RDS, dbClusterIdentifier string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
emptyResp := &rds.DescribeDBClustersInput{}

resp, err := conn.DescribeDBClusters(&rds.DescribeDBClustersInput{
DBClusterIdentifier: aws.String(dbClusterIdentifier),
})

if err != nil {
log.Printf("[DEBUG] Refreshing RDS Cluster Activity Stream State. Occur error: %s", err)
if tfawserr.ErrCodeContains(err, rds.ErrCodeDBClusterNotFoundFault) {
return emptyResp, rds.ActivityStreamStatusStopped, nil
} else if resp != nil && len(resp.DBClusters) == 0 {
return emptyResp, rds.ActivityStreamStatusStopped, nil
} else {
return emptyResp, "", fmt.Errorf("error on refresh: %+v", err)
}
}

if resp == nil || resp.DBClusters == nil || len(resp.DBClusters) == 0 {
log.Printf("[DEBUG] Refreshing RDS Cluster Activity Stream State. Invalid resp: %s", resp)
return emptyResp, rds.ActivityStreamStatusStopped, nil
}

cluster := resp.DBClusters[0]
status := aws.StringValue(cluster.ActivityStreamStatus)
log.Printf("[DEBUG] Refreshing RDS Cluster Activity Stream State... %s", status)
return cluster, status, nil
}
}
48 changes: 48 additions & 0 deletions aws/internal/service/rds/waiter/waiter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package waiter

import (
"fmt"
"log"
"time"

"github.com/aws/aws-sdk-go/service/rds"
Expand All @@ -11,6 +13,12 @@ const (
// Maximum amount of time to wait for an EventSubscription to return Deleted
EventSubscriptionDeletedTimeout = 10 * time.Minute
RdsClusterInitiateUpgradeTimeout = 5 * time.Minute

// Delay time to retry fetch RDS Cluster Activity Stream Status
ActivityStreamRetryDelay = 5 * time.Second

// Minimum timeout to retry fetch RDS Cluster Activity Stream Status
ActivityStreamRetryMinTimeout = 3 * time.Second
)

// EventSubscriptionDeleted waits for a EventSubscription to return Deleted
Expand Down Expand Up @@ -69,3 +77,43 @@ func DBProxyEndpointDeleted(conn *rds.RDS, id string, timeout time.Duration) (*r

return nil, err
}

// ActivityStreamStarted waits for RDS Cluster Activity Stream to be started
func ActivityStreamStarted(conn *rds.RDS, dbClusterIdentifier string, timeout time.Duration) error {
log.Printf("[DEBUG] Waiting for RDS Cluster Activity Stream %s to become started...", dbClusterIdentifier)

stateConf := &resource.StateChangeConf{
Pending: []string{rds.ActivityStreamStatusStarting},
Target: []string{rds.ActivityStreamStatusStarted},
Refresh: ActivityStreamStatus(conn, dbClusterIdentifier),
Timeout: timeout,
Delay: ActivityStreamRetryDelay,
MinTimeout: ActivityStreamRetryMinTimeout,
}

_, err := stateConf.WaitForState()
if err != nil {
return fmt.Errorf("error waiting for RDS Cluster Activity Stream (%s) to be started: %v", dbClusterIdentifier, err)
}
return nil
}

// ActivityStreamStarted waits for RDS Cluster Activity Stream to be stopped
func ActivityStreamStopped(conn *rds.RDS, dbClusterIdentifier string, timeout time.Duration) error {
log.Printf("[DEBUG] Waiting for RDS Cluster Activity Stream %s to become stopped...", dbClusterIdentifier)

stateConf := &resource.StateChangeConf{
Pending: []string{rds.ActivityStreamStatusStopping},
Target: []string{rds.ActivityStreamStatusStopped},
Refresh: ActivityStreamStatus(conn, dbClusterIdentifier),
Timeout: timeout,
Delay: ActivityStreamRetryDelay,
MinTimeout: ActivityStreamRetryMinTimeout,
}

_, err := stateConf.WaitForState()
if err != nil {
return fmt.Errorf("error waiting for RDS Cluster Activity Stream (%s) to be stopped: %v", dbClusterIdentifier, err)
}
return nil
}
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,7 @@ func Provider() *schema.Provider {
"aws_rds_cluster_endpoint": resourceAwsRDSClusterEndpoint(),
"aws_rds_cluster_instance": resourceAwsRDSClusterInstance(),
"aws_rds_cluster_parameter_group": resourceAwsRDSClusterParameterGroup(),
"aws_rds_cluster_activity_stream": resourceAwsRDSClusterActivityStream(),
"aws_rds_global_cluster": resourceAwsRDSGlobalCluster(),
"aws_redshift_cluster": resourceAwsRedshiftCluster(),
"aws_redshift_security_group": resourceAwsRedshiftSecurityGroup(),
Expand Down
166 changes: 166 additions & 0 deletions aws/resource_aws_rds_cluster_activity_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package aws

import (
"fmt"
"log"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/rds"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/rds/waiter"
)

func resourceAwsRDSClusterActivityStream() *schema.Resource {
return &schema.Resource{
Create: resourceAwsRDSClusterActivityStreamCreate,
Read: resourceAwsRDSClusterActivityStreamRead,
Delete: resourceAwsRDSClusterActivityStreamDelete,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(120 * time.Minute),
Delete: schema.DefaultTimeout(120 * time.Minute),
},

Schema: map[string]*schema.Schema{
"resource_arn": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validateArn,
},
"kms_key_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"mode": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
ValidateFunc: validation.StringInSlice([]string{
rds.ActivityStreamModeSync,
rds.ActivityStreamModeAsync,
}, false),
},
"kinesis_stream_name": {
Type: schema.TypeString,
Computed: true,
},
},
}
}

func resourceAwsRDSClusterActivityStreamCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).rdsconn

resourceArn := d.Get("resource_arn").(string)
kmsKeyId := d.Get("kms_key_id").(string)
mode := d.Get("mode").(string)

startActivityStreamInput := &rds.StartActivityStreamInput{
ResourceArn: aws.String(resourceArn),
ApplyImmediately: aws.Bool(true),
KmsKeyId: aws.String(kmsKeyId),
Mode: aws.String(mode),
}

log.Printf("[DEBUG] RDS Cluster start activity stream input: %s", startActivityStreamInput)

resp, err := conn.StartActivityStream(startActivityStreamInput)
if err != nil {
return fmt.Errorf("error creating RDS Cluster Activity Stream: %s", err)
}

log.Printf("[DEBUG]: RDS Cluster start activity stream response: %s", resp)

d.SetId(resourceArn)

err = waiter.ActivityStreamStarted(conn, d.Id(), d.Timeout(schema.TimeoutCreate))
if err != nil {
return err
}

return resourceAwsRDSClusterActivityStreamRead(d, meta)
}

func resourceAwsRDSClusterActivityStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).rdsconn

input := &rds.DescribeDBClustersInput{
DBClusterIdentifier: aws.String(d.Id()),
}

log.Printf("[DEBUG] Describing RDS Cluster: %s", input)
resp, err := conn.DescribeDBClusters(input)

if isAWSErr(err, rds.ErrCodeDBClusterNotFoundFault, "") {
log.Printf("[WARN] RDS Cluster (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}

if err != nil {
return fmt.Errorf("error describing RDS Cluster (%s): %s", d.Id(), err)
}

if resp == nil {
return fmt.Errorf("error retrieving RDS cluster: empty response for: %s", input)
}

var dbc *rds.DBCluster
for _, c := range resp.DBClusters {
if aws.StringValue(c.DBClusterArn) == d.Id() {
dbc = c
break
}
}

if dbc == nil {
log.Printf("[WARN] RDS Cluster (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}

if aws.StringValue(dbc.ActivityStreamStatus) == rds.ActivityStreamStatusStopped {
log.Printf("[WARN] RDS Cluster (%s) Activity Stream already stopped, removing from state", d.Id())
d.SetId("")
return nil
}

d.Set("resource_arn", dbc.DBClusterArn)
d.Set("kms_key_id", dbc.ActivityStreamKmsKeyId)
d.Set("kinesis_stream_name", dbc.ActivityStreamKinesisStreamName)
d.Set("mode", dbc.ActivityStreamMode)

return nil
}

func resourceAwsRDSClusterActivityStreamDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).rdsconn

stopActivityStreamInput := &rds.StopActivityStreamInput{
ApplyImmediately: aws.Bool(true),
ResourceArn: aws.String(d.Id()),
}

log.Printf("[DEBUG] RDS Cluster stop activity stream input: %s", stopActivityStreamInput)

resp, err := conn.StopActivityStream(stopActivityStreamInput)
if err != nil {
return fmt.Errorf("error stopping RDS Cluster Activity Stream: %s", err)
}

log.Printf("[DEBUG] RDS Cluster stop activity stream response: %s", resp)

err = waiter.ActivityStreamStopped(conn, d.Id(), d.Timeout(schema.TimeoutDelete))
if err != nil {
return err
}

return nil
}
Loading