Skip to content

Commit

Permalink
Specify kinesis stream as the source of a aws_kinesis_firehose_delive…
Browse files Browse the repository at this point in the history
…ry_stream (#1605)

* First attempt at resolving #1601

* Removing unused method

* Add acceptance test for aws_kinesis_firehose_delivery_stream using Kinesis Stream as source

* Updated "source_configuration" to be "kinesis_source_configuration"
  • Loading branch information
pleschev authored and Ninir committed Oct 12, 2017
1 parent be4f724 commit de474a8
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 0 deletions.
40 changes: 40 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,28 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},

"kinesis_source_configuration": {
Type: schema.TypeList,
ForceNew: true,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"kinesis_stream_arn": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateArn,
},

"role_arn": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateArn,
},
},
},
},

"destination": {
Type: schema.TypeString,
Required: true,
Expand Down Expand Up @@ -445,6 +467,16 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
}
}

func createSourceConfig(source map[string]interface{}) *firehose.KinesisStreamSourceConfiguration {

configuration := &firehose.KinesisStreamSourceConfiguration{
KinesisStreamARN: aws.String(source["kinesis_stream_arn"].(string)),
RoleARN: aws.String(source["role_arn"].(string)),
}

return configuration
}

func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration {
s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{})

Expand Down Expand Up @@ -812,6 +844,14 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
DeliveryStreamName: aws.String(sn),
}

if v, ok := d.GetOk("kinesis_source_configuration"); ok {
sourceConfig := createSourceConfig(v.([]interface{})[0].(map[string]interface{}))
createInput.KinesisStreamSourceConfiguration = sourceConfig
createInput.DeliveryStreamType = aws.String(firehose.DeliveryStreamTypeKinesisStreamAsSource)
} else {
createInput.DeliveryStreamType = aws.String(firehose.DeliveryStreamTypeDirectPut)
}

if d.Get("destination").(string) == "extended_s3" {
extendedS3Config := createExtendedS3Config(d)
createInput.ExtendedS3DestinationConfiguration = extendedS3Config
Expand Down
93 changes: 93 additions & 0 deletions aws/resource_aws_kinesis_firehose_delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3basic(t *testing.T) {
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_s3KinesisStreamSource(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := acctest.RandInt()
config := fmt.Sprintf(testAccKinesisFirehoseDeliveryStreamConfig_s3KinesisStreamSource,
ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri, ri, os.Getenv("AWS_ACCOUNT_ID"), ri, ri, ri)

resource.Test(t, resource.TestCase{
PreCheck: testAccKinesisFirehosePreCheck(t),
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
{
Config: config,
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil, nil),
),
},
},
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_s3WithCloudwatchLogging(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := acctest.RandInt()
Expand Down Expand Up @@ -634,6 +656,62 @@ EOF
`

const testAccFirehoseKinesisStreamSource = `
resource "aws_kinesis_stream" "source" {
name = "terraform-kinesis-source-stream-basictest-%d"
shard_count = 1
}
resource "aws_iam_role" "kinesis_source" {
name = "tf_acctest_kinesis_source_role_%d"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "%s"
}
}
}
]
}
EOF
}
resource "aws_iam_role_policy" "kinesis_source" {
name = "tf_acctest_kinesis_source_policy_%d"
role = "${aws_iam_role.kinesis_source.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:*:*:stream/terraform-kinesis-source-stream-basictest-%d"
]
}
]
}
EOF
}
`

func testAccKinesisFirehoseDeliveryStreamConfig_s3WithCloudwatchLogging(accountId string, rInt int) string {
return fmt.Sprintf(`
resource "aws_iam_role" "firehose" {
Expand Down Expand Up @@ -739,6 +817,21 @@ resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
}
}`

var testAccKinesisFirehoseDeliveryStreamConfig_s3KinesisStreamSource = testAccKinesisFirehoseDeliveryStreamBaseConfig + testAccFirehoseKinesisStreamSource + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose", "aws_iam_role_policy.kinesis_source"]
name = "terraform-kinesis-firehose-basictest-%d"
kinesis_source_configuration {
kinesis_stream_arn = "${aws_kinesis_stream.source.arn}"
role_arn = "${aws_iam_role.kinesis_source.arn}"
}
destination = "s3"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
}
}`

var testAccKinesisFirehoseDeliveryStreamConfig_s3Updates = testAccKinesisFirehoseDeliveryStreamBaseConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
Expand Down
5 changes: 5 additions & 0 deletions website/docs/r/kinesis_firehose_delivery_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ The following arguments are supported:

* `name` - (Required) A name to identify the stream. This is unique to the
AWS account and region the Stream is created in.
* `kinesis_source_configuration` - (Optional) Allows the ability to specify the kinesis stream that is used as the source of the firehose delivery stream.
* `destination` – (Required) This is the destination to where the data is delivered. The only options are `s3` (Deprecated, use `extended_s3` instead), `extended_s3`, `redshift`, and `elasticsearch`.
* `s3_configuration` - (Optional, Deprecated, see/use `extended_s3_configuration` unless `destination` is `redshift`) Configuration options for the s3 destination (or the intermediate bucket if the destination
is redshift). More details are given below.
Expand All @@ -215,6 +216,10 @@ is redshift). More details are given below.
Using `redshift_configuration` requires the user to also specify a
`s3_configuration` block. More details are given below.

The `kinesis_source_configuration` object supports the following:
* `kinesis_stream_arn` (Required) The kinesis stream used as the source of the firehose delivery stream.
* `role_arn` (Required) The ARN of the role that provides access to the source Kinesis stream.

The `s3_configuration` object supports the following:

* `role_arn` - (Required) The ARN of the AWS credentials.
Expand Down

0 comments on commit de474a8

Please sign in to comment.