Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Snowflake destination buffering hint settings for aws_kinesis_firehose_delivery_stream #39214

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/39214.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_kinesis_firehose_delivery_stream: Add `snowflake_configuration.buffering_internal` and `snowflake_configuration.buffering_size` arguments
```
33 changes: 31 additions & 2 deletions internal/service/firehose/delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,18 @@ func resourceDeliveryStream() *schema.Resource {
Type: schema.TypeString,
Required: true,
},
"buffering_interval": {
Type: schema.TypeInt,
Optional: true,
Default: 0,
ValidateFunc: validation.IntBetween(0, 900),
},
"buffering_size": {
Type: schema.TypeInt,
Optional: true,
Default: 1,
ValidateFunc: validation.IntBetween(1, 128),
},
"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
"content_column_name": {
Type: schema.TypeString,
Expand Down Expand Up @@ -2728,7 +2740,11 @@ func expandAmazonOpenSearchServerlessDestinationUpdate(oss map[string]interface{
func expandSnowflakeDestinationConfiguration(tfMap map[string]interface{}) *types.SnowflakeDestinationConfiguration {
roleARN := tfMap[names.AttrRoleARN].(string)
apiObject := &types.SnowflakeDestinationConfiguration{
AccountUrl: aws.String(tfMap["account_url"].(string)),
AccountUrl: aws.String(tfMap["account_url"].(string)),
BufferingHints: &types.SnowflakeBufferingHints{
IntervalInSeconds: aws.Int32(int32(tfMap["buffering_interval"].(int))),
SizeInMBs: aws.Int32(int32(tfMap["buffering_size"].(int))),
},
Database: aws.String(tfMap[names.AttrDatabase].(string)),
RetryOptions: expandSnowflakeRetryOptions(tfMap),
RoleARN: aws.String(roleARN),
Expand Down Expand Up @@ -2792,7 +2808,11 @@ func expandSnowflakeDestinationConfiguration(tfMap map[string]interface{}) *type
func expandSnowflakeDestinationUpdate(tfMap map[string]interface{}) *types.SnowflakeDestinationUpdate {
roleARN := tfMap[names.AttrRoleARN].(string)
apiObject := &types.SnowflakeDestinationUpdate{
AccountUrl: aws.String(tfMap["account_url"].(string)),
AccountUrl: aws.String(tfMap["account_url"].(string)),
BufferingHints: &types.SnowflakeBufferingHints{
IntervalInSeconds: aws.Int32(int32(tfMap["buffering_interval"].(int))),
SizeInMBs: aws.Int32(int32(tfMap["buffering_size"].(int))),
},
Database: aws.String(tfMap[names.AttrDatabase].(string)),
RetryOptions: expandSnowflakeRetryOptions(tfMap),
RoleARN: aws.String(roleARN),
Expand Down Expand Up @@ -3556,6 +3576,15 @@ func flattenSnowflakeDestinationDescription(apiObject *types.SnowflakeDestinatio
"user": aws.ToString(apiObject.User),
}

if v := apiObject.BufferingHints; v != nil {
if v.IntervalInSeconds != nil {
tfMap["buffering_interval"] = int(aws.ToInt32(v.IntervalInSeconds))
}
if v.SizeInMBs != nil {
tfMap["buffering_size"] = int(aws.ToInt32(v.SizeInMBs))
}
}

if apiObject.RetryOptions != nil {
tfMap["retry_duration"] = int(aws.ToInt32(apiObject.RetryOptions.DurationInSeconds))
}
Expand Down
23 changes: 16 additions & 7 deletions internal/service/firehose/delivery_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", acctest.Ct0),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""),
Expand Down Expand Up @@ -1193,6 +1195,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", "900"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", "128"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""),
Expand Down Expand Up @@ -1267,6 +1271,8 @@ func TestAccFirehoseDeliveryStream_snowflakeUpdates(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "server_side_encryption.0.key_type", "AWS_OWNED_CMK"),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.account_url", fmt.Sprintf("https://%s.snowflakecomputing.com", rName)),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_interval", acctest.Ct0),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.buffering_size", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.enabled", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "snowflake_configuration.0.cloudwatch_logging_options.0.log_group_name", ""),
Expand Down Expand Up @@ -3997,13 +4003,15 @@ resource "aws_kinesis_firehose_delivery_stream" "test" {
destination = "snowflake"

snowflake_configuration {
account_url = "https://%[1]s.snowflakecomputing.com"
database = "test-db"
private_key = "%[2]s"
role_arn = aws_iam_role.firehose.arn
schema = "test-schema"
table = "test-table"
user = "test-usr"
account_url = "https://%[1]s.snowflakecomputing.com"
buffering_interval = 900
buffering_size = 128
database = "test-db"
private_key = "%[2]s"
role_arn = aws_iam_role.firehose.arn
schema = "test-schema"
table = "test-table"
user = "test-usr"

s3_configuration {
role_arn = aws_iam_role.firehose.arn
Expand Down Expand Up @@ -4043,6 +4051,7 @@ resource "aws_kinesis_firehose_delivery_stream" "test" {
}
}
}

`, rName, acctest.TLSPEMRemoveRSAPrivateKeyEncapsulationBoundaries(acctest.TLSPEMRemoveNewlines(privateKey))))
}

Expand Down
18 changes: 11 additions & 7 deletions website/docs/r/kinesis_firehose_delivery_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,15 @@ resource "aws_kinesis_firehose_delivery_stream" "example_snowflake_destination"
destination = "snowflake"

snowflake_configuration {
account_url = "https://example.snowflakecomputing.com"
database = "example-db"
private_key = "..."
role_arn = aws_iam_role.firehose.arn
schema = "example-schema"
table = "example-table"
user = "example-usr"
account_url = "https://example.snowflakecomputing.com"
buffering_size = 15
buffering_interval = 600
database = "example-db"
private_key = "..."
role_arn = aws_iam_role.firehose.arn
schema = "example-schema"
table = "example-table"
user = "example-usr"

s3_configuration {
role_arn = aws_iam_role.firehose.arn
Expand Down Expand Up @@ -796,6 +798,8 @@ The `http_endpoint_configuration` configuration block supports the following arg
The `snowflake_configuration` configuration block supports the following arguments:

* `account_url` - (Required) The URL of the Snowflake account. Format: https://[account_identifier].snowflakecomputing.com.
* `buffering_size` - (Optional) Buffer incoming data to the specified size, in MBs between 1 to 128, before delivering it to the destination. The default value is 1MB.
* `buffering_interval` - (Optional) Buffer incoming data for the specified period of time, in seconds between 0 to 900, before delivering it to the destination. The default value is 0s.
* `private_key` - (Optional) The private key for authentication. This value is required if `secrets_manager_configuration` is not provided.
* `key_passphrase` - (Optional) The passphrase for the private key.
* `user` - (Optional) The user for authentication. This value is required if `secrets_manager_configuration` is not provided.
Expand Down
Loading