Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify kinesis stream as the source of a aws_kinesis_firehose_delivery_stream #1605

Merged
merged 5 commits into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,28 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},

"source_configuration": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep "kinesis_source_configuration", since we can have other sources potentially in the future. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, I'll update

Type: schema.TypeList,
ForceNew: true,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"kinesis_stream_arn": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateArn,
},

"role_arn": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateArn,
},
},
},
},

"destination": {
Type: schema.TypeString,
Required: true,
Expand Down Expand Up @@ -445,6 +467,16 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
}
}

func createSourceConfig(source map[string]interface{}) *firehose.KinesisStreamSourceConfiguration {

configuration := &firehose.KinesisStreamSourceConfiguration{
KinesisStreamARN: aws.String(source["kinesis_stream_arn"].(string)),
RoleARN: aws.String(source["role_arn"].(string)),
}

return configuration
}

func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration {
s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{})

Expand Down Expand Up @@ -804,6 +836,14 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
DeliveryStreamName: aws.String(sn),
}

if v, ok := d.GetOk("source_configuration"); ok {
sourceConfig := createSourceConfig(v.([]interface{})[0].(map[string]interface{}))
createInput.KinesisStreamSourceConfiguration = sourceConfig
createInput.DeliveryStreamType = aws.String(firehose.DeliveryStreamTypeKinesisStreamAsSource)
} else {
createInput.DeliveryStreamType = aws.String(firehose.DeliveryStreamTypeDirectPut)
}

if d.Get("destination").(string) == "extended_s3" {
extendedS3Config := createExtendedS3Config(d)
createInput.ExtendedS3DestinationConfiguration = extendedS3Config
Expand Down
93 changes: 93 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3basic(t *testing.T) {
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_s3KinesisStreamSource(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := acctest.RandInt()
config := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3KinesisStreamSource,
ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri, ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri)

resource.Test(t, resource.TestCase{
PreCheck: testAccKinesisFirehosePreCheck(t),
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
{
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil),
),
},
},
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_s3WithCloudwatchLogging(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := acctest.RandInt()
Expand Down Expand Up @@ -614,6 +636,62 @@ EOF

`

const testAccFirehoseKinesisStreamSource = `
resource "aws_kinesis_stream" "source" {
name = "terraform-kinesis-source-stream-basictest-%d"
shard_count = 1
}

resource "aws_iam_role" "kinesis_source" {
name = "tf_acctest_kinesis_source_role_%d"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "%s"
}
}
}
]
}
EOF
}

resource "aws_iam_role_policy" "kinesis_source" {
name = "tf_acctest_kinesis_source_policy_%d"
role = "${aws_iam_role.kinesis_source.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:*:*:stream/terraform-kinesis-source-stream-basictest-%d"
]
}
]
}
EOF
}

`

func testAccKinesisFirehoseDeliveryStreamConfig_s3WithCloudwatchLogging(accountId string, rInt int) string {
return fmt.Sprintf(`
resource "aws_iam_role" "firehose" {
Expand Down Expand Up @@ -719,6 +797,21 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
}
}`

var testAccKinesisFirehoseDeliveryStreamConfig_s3KinesisStreamSource = testAccKinesisFirehoseDeliveryStreamBaseConfig + testAccFirehoseKinesisStreamSource + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose", "aws_iam_role_policy.kinesis_source"]
name = "terraform-kinesis-firehose-basictest-%d"
source_configuration {
kinesis_stream_arn = "${aws_kinesis_stream.source.arn}"
role_arn = "${aws_iam_role.kinesis_source.arn}"
}
destination = "s3"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
}`

var testAccKinesisFirehoseDeliveryStreamConfig_s3Updates = testAccKinesisFirehoseDeliveryStreamBaseConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
Expand Down
5 changes: 5 additions & 0 deletions website/docs/r/kinesis_firehose_delivery_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ The following arguments are supported:

* `name` - (Required) A name to identify the stream. This is unique to the
AWS account and region the Stream is created in.
* `source_configuration` - (Optional) Allows the ability to specify the kinesis stream that is used as the source of the firehose delivery stream.
* `destination` – (Required) This is the destination to where the data is delivered. The only options are `s3` (Deprecated, use `extended_s3` instead), `extended_s3`, `redshift`, and `elasticsearch`.
* `s3_configuration` - (Optional, Deprecated, see/use `extended_s3_configuration` unless `destination` is `redshift`) Configuration options for the s3 destination (or the intermediate bucket if the destination
is redshift). More details are given below.
Expand All @@ -215,6 +216,10 @@ is redshift). More details are given below.
Using `redshift_configuration` requires the user to also specify a
`s3_configuration` block. More details are given below.

The `source_configuration` object supports the following:
* `kinesis_stream_arn` (Required) The kinesis stream used as the source of the firehose delivery stream.
* `role_arn` (Required) The ARN of the role that provides access to the source Kinesis stream.

The `s3_configuration` object supports the following:

* `role_arn` - (Required) The ARN of the AWS credentials.
Expand Down