Skip to content

Commit

Permalink
Merge pull request #6548 from ewbankkit/issue-4781
Browse files Browse the repository at this point in the history
Add support for tags on Kinesis Firehose delivery streams
  • Loading branch information
bflad authored Nov 25, 2018
2 parents 9cf443f + 188aad8 commit d2847d1
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 4 deletions.
24 changes: 21 additions & 3 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,8 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},

"tags": tagsSchema(),

"kinesis_source_configuration": {
Type: schema.TypeList,
ForceNew: true,
Expand Down Expand Up @@ -2111,6 +2113,12 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
d.SetId(*s.DeliveryStreamARN)
d.Set("arn", s.DeliveryStreamARN)

if err := setTagsKinesisFirehose(conn, d, sn); err != nil {
return fmt.Errorf(
"Error setting for Kinesis Stream (%s) tags: %s",
sn, err)
}

return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}

Expand Down Expand Up @@ -2155,7 +2163,6 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
conn := meta.(*AWSClient).firehoseconn

sn := d.Get("name").(string)

updateInput := &firehose.UpdateDestinationInput{
DeliveryStreamName: aws.String(sn),
CurrentDeliveryStreamVersionId: aws.String(d.Get("version_id").(string)),
Expand Down Expand Up @@ -2223,19 +2230,26 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
sn, err)
}

if err := setTagsKinesisFirehose(conn, d, sn); err != nil {
return fmt.Errorf(
"Error Updating Kinesis Firehose Delivery Stream tags: \"%s\"\n%s",
sn, err)
}

return resourceAwsKinesisFirehoseDeliveryStreamRead(d, meta)
}

func resourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).firehoseconn

sn := d.Get("name").(string)
resp, err := conn.DescribeDeliveryStream(&firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: aws.String(d.Get("name").(string)),
DeliveryStreamName: aws.String(sn),
})

if err != nil {
if isAWSErr(err, firehose.ErrCodeResourceNotFoundException, "") {
log.Printf("[WARN] Kinesis Firehose Delivery Stream (%s) not found, removing from state", d.Get("name").(string))
log.Printf("[WARN] Kinesis Firehose Delivery Stream (%s) not found, removing from state", sn)
d.SetId("")
return nil
}
Expand All @@ -2248,6 +2262,10 @@ func resourceAwsKinesisFirehoseDeliveryStreamRead(d *schema.ResourceData, meta i
return err
}

if err := getTagsKinesisFirehose(conn, d, sn); err != nil {
return err
}

return nil
}

Expand Down
77 changes: 77 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,48 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3basic(t *testing.T) {
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_s3basicWithTags(t *testing.T) {
var stream firehose.DeliveryStreamDescription
rInt := acctest.RandInt()
rName := fmt.Sprintf("terraform-kinesis-firehose-basictest-%d", rInt)
config := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3basic,
rInt, rInt, rInt, rInt)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
{
Config: testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithTags(rName, rInt),
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil),
resource.TestCheckResourceAttr("aws_kinesis_firehose_delivery_stream.test_stream", "tags.%", "2"),
resource.TestCheckResourceAttr("aws_kinesis_firehose_delivery_stream.test_stream", "tags.Usage", "original"),
),
},
{
Config: testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithTagsChanged(rName, rInt),
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil),
resource.TestCheckResourceAttr("aws_kinesis_firehose_delivery_stream.test_stream", "tags.%", "1"),
resource.TestCheckResourceAttr("aws_kinesis_firehose_delivery_stream.test_stream", "tags.Usage", "changed"),
),
},
{
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil, nil),
resource.TestCheckResourceAttr("aws_kinesis_firehose_delivery_stream.test_stream", "tags.%", "0"),
),
},
},
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_s3KinesisStreamSource(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := acctest.RandInt()
Expand Down Expand Up @@ -1283,6 +1325,41 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
}
}`

func testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithTags(rName string, rInt int) string {
return fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamBaseConfig, rInt, rInt, rInt) +
fmt.Sprintf(`
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
name = "%s"
destination = "s3"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
tags {
Environment = "production"
Usage = "original"
}
}`, rName)
}

func testAccKinesisFirehoseDeliveryStreamConfig_s3basicWithTagsChanged(rName string, rInt int) string {
return fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamBaseConfig, rInt, rInt, rInt) +
fmt.Sprintf(`
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
name = "%s"
destination = "s3"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
tags {
Usage = "changed"
}
}`, rName)
}

var testAccKinesisFirehoseDeliveryStreamConfig_s3KinesisStreamSource = testAccKinesisFirehoseDeliveryStreamBaseConfig + testAccFirehoseKinesisStreamSource + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose", "aws_iam_role_policy.kinesis_source"]
Expand Down
153 changes: 153 additions & 0 deletions aws/tagsKinesisFirehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package aws

import (
"log"
"regexp"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/hashicorp/terraform/helper/schema"
)

// getTags is a helper to get the tags for a resource. It expects the
// tags field to be named "tags"
func getTagsKinesisFirehose(conn *firehose.Firehose, d *schema.ResourceData, sn string) error {
tags := make([]*firehose.Tag, 0)
var exclusiveStartTagKey string
for {
req := &firehose.ListTagsForDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
}
if exclusiveStartTagKey != "" {
req.ExclusiveStartTagKey = aws.String(exclusiveStartTagKey)
}

resp, err := conn.ListTagsForDeliveryStream(req)
if err != nil {
return err
}

for _, tag := range resp.Tags {
tags = append(tags, tag)
}

// If HasMoreTags is true in the response, more tags are available.
// To list the remaining tags, set ExclusiveStartTagKey to the key
// of the last tag returned and call ListTagsForDeliveryStream again.
if !aws.BoolValue(resp.HasMoreTags) {
break
}
exclusiveStartTagKey = aws.StringValue(tags[len(tags)-1].Key)
}

if err := d.Set("tags", tagsToMapKinesisFirehose(tags)); err != nil {
return err
}

return nil
}

// setTags is a helper to set the tags for a resource. It expects the
// tags field to be named "tags"
func setTagsKinesisFirehose(conn *firehose.Firehose, d *schema.ResourceData, sn string) error {
if d.HasChange("tags") {
oraw, nraw := d.GetChange("tags")
o := oraw.(map[string]interface{})
n := nraw.(map[string]interface{})
create, remove := diffTagsKinesisFirehose(tagsFromMapKinesisFirehose(o), tagsFromMapKinesisFirehose(n))

// Set tags
if len(remove) > 0 {
log.Printf("[DEBUG] Removing tags: %#v", remove)
k := make([]*string, len(remove), len(remove))
for i, t := range remove {
k[i] = t.Key
}

_, err := conn.UntagDeliveryStream(&firehose.UntagDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
TagKeys: k,
})
if err != nil {
return err
}
}
if len(create) > 0 {
log.Printf("[DEBUG] Creating tags: %#v", create)
_, err := conn.TagDeliveryStream(&firehose.TagDeliveryStreamInput{
DeliveryStreamName: aws.String(sn),
Tags: create,
})
if err != nil {
return err
}
}
}

return nil
}

// diffTags takes our tags locally and the ones remotely and returns
// the set of tags that must be created, and the set of tags that must
// be destroyed.
func diffTagsKinesisFirehose(oldTags, newTags []*firehose.Tag) ([]*firehose.Tag, []*firehose.Tag) {
// First, we're creating everything we have
create := make(map[string]interface{})
for _, t := range newTags {
create[aws.StringValue(t.Key)] = aws.StringValue(t.Value)
}

// Build the list of what to remove
var remove []*firehose.Tag
for _, t := range oldTags {
old, ok := create[aws.StringValue(t.Key)]
if !ok || old != aws.StringValue(t.Value) {
// Delete it!
remove = append(remove, t)
}
}

return tagsFromMapKinesisFirehose(create), remove
}

// tagsFromMap returns the tags for the given map of data.
func tagsFromMapKinesisFirehose(m map[string]interface{}) []*firehose.Tag {
result := make([]*firehose.Tag, 0, len(m))
for k, v := range m {
t := &firehose.Tag{
Key: aws.String(k),
Value: aws.String(v.(string)),
}
if !tagIgnoredKinesisFirehose(t) {
result = append(result, t)
}
}

return result
}

// tagsToMap turns the list of tags into a map.
func tagsToMapKinesisFirehose(ts []*firehose.Tag) map[string]string {
result := make(map[string]string)
for _, t := range ts {
if !tagIgnoredKinesisFirehose(t) {
result[aws.StringValue(t.Key)] = aws.StringValue(t.Value)
}
}

return result
}

// compare a tag against a list of strings and checks if it should
// be ignored or not
func tagIgnoredKinesisFirehose(t *firehose.Tag) bool {
filter := []string{"^aws:"}
for _, v := range filter {
log.Printf("[DEBUG] Matching %v with %v\n", v, *t.Key)
if r, _ := regexp.MatchString(v, *t.Key); r == true {
log.Printf("[DEBUG] Found AWS specific tag %s (val: %s), ignoring.\n", *t.Key, *t.Value)
return true
}
}
return false
}
79 changes: 79 additions & 0 deletions aws/tagsKinesisFirehose_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package aws

import (
"reflect"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/firehose"
)

// go test -v -run="TestDiffKinesisFirehoseTags"
func TestDiffKinesisFirehoseTags(t *testing.T) {
cases := []struct {
Old, New map[string]interface{}
Create, Remove map[string]string
}{
// Basic add/remove
{
Old: map[string]interface{}{
"foo": "bar",
},
New: map[string]interface{}{
"bar": "baz",
},
Create: map[string]string{
"bar": "baz",
},
Remove: map[string]string{
"foo": "bar",
},
},

// Modify
{
Old: map[string]interface{}{
"foo": "bar",
},
New: map[string]interface{}{
"foo": "baz",
},
Create: map[string]string{
"foo": "baz",
},
Remove: map[string]string{
"foo": "bar",
},
},
}

for i, tc := range cases {
c, r := diffTagsKinesisFirehose(tagsFromMapKinesisFirehose(tc.Old), tagsFromMapKinesisFirehose(tc.New))
cm := tagsToMapKinesisFirehose(c)
rm := tagsToMapKinesisFirehose(r)
if !reflect.DeepEqual(cm, tc.Create) {
t.Fatalf("%d: bad create: %#v", i, cm)
}
if !reflect.DeepEqual(rm, tc.Remove) {
t.Fatalf("%d: bad remove: %#v", i, rm)
}
}
}

// go test -v -run="TestIgnoringTagsKinesisFirehose"
func TestIgnoringTagsKinesisFirehose(t *testing.T) {
var ignoredTags []*firehose.Tag
ignoredTags = append(ignoredTags, &firehose.Tag{
Key: aws.String("aws:cloudformation:logical-id"),
Value: aws.String("foo"),
})
ignoredTags = append(ignoredTags, &firehose.Tag{
Key: aws.String("aws:foo:bar"),
Value: aws.String("baz"),
})
for _, tag := range ignoredTags {
if !tagIgnoredKinesisFirehose(tag) {
t.Fatalf("Tag %v with value %v not ignored, but should be!", *tag.Key, *tag.Value)
}
}
}
Loading

0 comments on commit d2847d1

Please sign in to comment.