Skip to content

Commit

Permalink
feat: Add Snowflake destination buffering hint settings for aws_kines…
Browse files Browse the repository at this point in the history
…is_firehose_delivery_stream
  • Loading branch information
acwwat committed Sep 8, 2024
1 parent ea069cf commit 0a60cab
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 16 deletions.
3 changes: 3 additions & 0 deletions .changelog/39214.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_kinesis_firehose_delivery_stream: Add `snowflake_configuration.buffering_internal` and `snowflake_configuration.buffering_size` arguments
```
33 changes: 31 additions & 2 deletions internal/service/firehose/delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,18 @@ func resourceDeliveryStream() *schema.Resource {
Type: schema.TypeString,
Required: true,
},
"buffering_interval": {
Type: schema.TypeInt,
Optional: true,
Default: 0,
ValidateFunc: validation.IntBetween(0, 900),
},
"buffering_size": {
Type: schema.TypeInt,
Optional: true,
Default: 1,
ValidateFunc: validation.IntBetween(1, 128),
},
"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
"content_column_name": {
Type: schema.TypeString,
Expand Down Expand Up @@ -2728,7 +2740,11 @@ func expandAmazonOpenSearchServerlessDestinationUpdate(oss map[string]interface{
func expandSnowflakeDestinationConfiguration(tfMap map[string]interface{}) *types.SnowflakeDestinationConfiguration {
roleARN := tfMap[names.AttrRoleARN].(string)
apiObject := &types.SnowflakeDestinationConfiguration{
AccountUrl: aws.String(tfMap["account_url"].(string)),
AccountUrl: aws.String(tfMap["account_url"].(string)),
BufferingHints: &types.SnowflakeBufferingHints{
IntervalInSeconds: aws.Int32(int32(tfMap["buffering_interval"].(int))),
SizeInMBs: aws.Int32(int32(tfMap["buffering_size"].(int))),
},
Database: aws.String(tfMap[names.AttrDatabase].(string)),
RetryOptions: expandSnowflakeRetryOptions(tfMap),
RoleARN: aws.String(roleARN),
Expand Down Expand Up @@ -2792,7 +2808,11 @@ func expandSnowflakeDestinationConfiguration(tfMap map[string]interface{}) *type
func expandSnowflakeDestinationUpdate(tfMap map[string]interface{}) *types.SnowflakeDestinationUpdate {
roleARN := tfMap[names.AttrRoleARN].(string)
apiObject := &types.SnowflakeDestinationUpdate{
AccountUrl: aws.String(tfMap["account_url"].(string)),
AccountUrl: aws.String(tfMap["account_url"].(string)),
BufferingHints: &types.SnowflakeBufferingHints{
IntervalInSeconds: aws.Int32(int32(tfMap["buffering_interval"].(int))),
SizeInMBs: aws.Int32(int32(tfMap["buffering_size"].(int))),
},
Database: aws.String(tfMap[names.AttrDatabase].(string)),
RetryOptions: expandSnowflakeRetryOptions(tfMap),
RoleARN: aws.String(roleARN),
Expand Down Expand Up @@ -3556,6 +3576,15 @@ func flattenSnowflakeDestinationDescription(apiObject *types.SnowflakeDestinatio
"user": aws.ToString(apiObject.User),
}

if v := apiObject.BufferingHints; v != nil {
if v.IntervalInSeconds != nil {
tfMap["buffering_interval"] = int(aws.ToInt32(v.IntervalInSeconds))
}
if v.SizeInMBs != nil {
tfMap["buffering_size"] = int(aws.ToInt32(v.SizeInMBs))
}
}

if apiObject.RetryOptions != nil {
tfMap["retry_duration"] = int(aws.ToInt32(apiObject.RetryOptions.DurationInSeconds))
}
Expand Down
23 changes: 16 additions & 7 deletions internal/service/firehose/delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", acctest.Ct0),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""),
Expand Down Expand Up @@ -1193,6 +1195,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", "900"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", "128"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""),
Expand Down Expand Up @@ -1267,6 +1271,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", acctest.Ct0),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""),
Expand Down Expand Up @@ -3997,13 +4003,15 @@ resource "aws_kinesis_firehose_delivery_stream" "test" {
destination = "snowflake"
snowflake_configuration {
account_url = "https://%[1]s.snowflakecomputing.com"
database = "test-db"
private_key = "%[2]s"
role_arn = aws_iam_role.firehose.arn
schema = "test-schema"
table = "test-table"
user = "test-usr"
account_url = "https://%[1]s.snowflakecomputing.com"
buffering_interval = 900
buffering_size = 128
database = "test-db"
private_key = "%[2]s"
role_arn = aws_iam_role.firehose.arn
schema = "test-schema"
table = "test-table"
user = "test-usr"
s3_configuration {
role_arn = aws_iam_role.firehose.arn
Expand Down Expand Up @@ -4043,6 +4051,7 @@ resource "aws_kinesis_firehose_delivery_stream" "test" {
}
}
}
`, rName, acctest.TLSPEMRemoveRSAPrivateKeyEncapsulationBoundaries(acctest.TLSPEMRemoveNewlines(privateKey))))
}

Expand Down
18 changes: 11 additions & 7 deletions website/docs/r/kinesis_firehose_delivery_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,15 @@ resource "aws_kinesis_firehose_delivery_stream" "example_snowflake_destination"
destination = "snowflake"
snowflake_configuration {
account_url = "https://example.snowflakecomputing.com"
database = "example-db"
private_key = "..."
role_arn = aws_iam_role.firehose.arn
schema = "example-schema"
table = "example-table"
user = "example-usr"
account_url = "https://example.snowflakecomputing.com"
buffering_size = 15
buffering_interval = 600
database = "example-db"
private_key = "..."
role_arn = aws_iam_role.firehose.arn
schema = "example-schema"
table = "example-table"
user = "example-usr"
s3_configuration {
role_arn = aws_iam_role.firehose.arn
Expand Down Expand Up @@ -796,6 +798,8 @@ The `http_endpoint_configuration` configuration block supports the following arg
The `snowflake_configuration` configuration block supports the following arguments:

* `account_url` - (Required) The URL of the Snowflake account. Format: https://[account_identifier].snowflakecomputing.com.
* `buffering_size` - (Optional) Buffer incoming data to the specified size, in MBs between 1 to 128, before delivering it to the destination. The default value is 1MB.
* `buffering_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds between 0 to 900, before delivering it to the destination. The default value is 0s.
* `private_key` - (Optional) The private key for authentication. This value is required if `secrets_manager_configuration` is not provided.
* `key_passphrase` - (Optional) The passphrase for the private key.
* `user` - (Optional) The user for authentication. This value is required if `secrets_manager_configuration` is not provided.
Expand Down

0 comments on commit 0a60cab

Please sign in to comment.