Skip to content

Commit

Permalink
Merge pull request #24395 from jalavoy/main
Browse files Browse the repository at this point in the history
Add Kafka support to IOT topic rules
  • Loading branch information
ewbankkit authored May 5, 2022
2 parents 75d1c61 + 8b059a1 commit 2f150ab
Show file tree
Hide file tree
Showing 14 changed files with 3,363 additions and 1,092 deletions.
3 changes: 3 additions & 0 deletions .changelog/16087.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_iot_topic_rule: Add `http` and `error_action.http` arguments
```
3 changes: 3 additions & 0 deletions .changelog/19175.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_iot_topic_rule: Add `s3.canned_acl` and `error_action.s3.canned_acl` arguments
```
3 changes: 3 additions & 0 deletions .changelog/22337.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_iot_topic_rule: Add `timestream` and `error_action.timestream` arguments
```
7 changes: 7 additions & 0 deletions .changelog/24395.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:enhancement
resource/aws_iot_topic_rule: Add `kafka` and `error_action.kafka` arguments
```

```release-note:new-resource
aws_iot_topic_rule_destination
```
1 change: 1 addition & 0 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -1548,6 +1548,7 @@ func Provider() *schema.Provider {
"aws_iot_thing_principal_attachment": iot.ResourceThingPrincipalAttachment(),
"aws_iot_thing_type": iot.ResourceThingType(),
"aws_iot_topic_rule": iot.ResourceTopicRule(),
"aws_iot_topic_rule_destination": iot.ResourceTopicRuleDestination(),

"aws_msk_cluster": kafka.ResourceCluster(),
"aws_msk_configuration": kafka.ResourceConfiguration(),
Expand Down
1 change: 1 addition & 0 deletions internal/service/ec2/sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ func init() {
"aws_fsx_ontap_file_system",
"aws_fsx_openzfs_file_system",
"aws_fsx_windows_file_system",
"aws_iot_topic_rule_destination",
"aws_lambda_function",
"aws_lb",
"aws_memorydb_subnet_group",
Expand Down
104 changes: 104 additions & 0 deletions internal/service/iot/find.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package iot

import (
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/iot"
"github.com/hashicorp/aws-sdk-go-base/v2/awsv1shim/v2/tfawserr"
Expand Down Expand Up @@ -119,3 +121,105 @@ func FindThingGroupMembership(conn *iot.IoT, thingGroupName, thingName string) e

return nil
}

func FindTopicRuleByName(conn *iot.IoT, name string) (*iot.GetTopicRuleOutput, error) {
// GetTopicRule returns unhelpful errors such as
// "An error occurred (UnauthorizedException) when calling the GetTopicRule operation: Access to topic rule 'xxxxxxxx' was denied"
// when querying for a rule that doesn't exist.
var rule *iot.TopicRuleListItem

err := conn.ListTopicRulesPages(&iot.ListTopicRulesInput{}, func(page *iot.ListTopicRulesOutput, lastPage bool) bool {
if page == nil {
return !lastPage
}

for _, v := range page.Rules {
if v == nil {
continue
}

if aws.StringValue(v.RuleName) == name {
rule = v

return false
}
}

return !lastPage
})

if err != nil {
return nil, err
}

if rule == nil {
return nil, tfresource.NewEmptyResultError(name)
}

input := &iot.GetTopicRuleInput{
RuleName: aws.String(name),
}

output, err := conn.GetTopicRule(input)

if err != nil {
return nil, err
}

if output == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output, nil
}

func FindTopicRuleDestinationByARN(ctx context.Context, conn *iot.IoT, arn string) (*iot.TopicRuleDestination, error) {
// GetTopicRuleDestination returns unhelpful errors such as
// "UnauthorizedException: Access to TopicRuleDestination 'arn:aws:iot:us-west-2:123456789012:ruledestination/vpc/f267138a-7383-4670-9e44-a7fe2f48af5e' was denied"
// when querying for a rule destination that doesn't exist.
var destination *iot.TopicRuleDestinationSummary

err := conn.ListTopicRuleDestinationsPages(&iot.ListTopicRuleDestinationsInput{}, func(page *iot.ListTopicRuleDestinationsOutput, lastPage bool) bool {
if page == nil {
return !lastPage
}

for _, v := range page.DestinationSummaries {
if v == nil {
continue
}

if aws.StringValue(v.Arn) == arn {
destination = v

return false
}
}

return !lastPage
})

if err != nil {
return nil, err
}

if destination == nil {
return nil, tfresource.NewEmptyResultError(destination)
}

input := &iot.GetTopicRuleDestinationInput{
Arn: aws.String(arn),
}

output, err := conn.GetTopicRuleDestinationWithContext(ctx, input)

if err != nil {
return nil, err
}

if output == nil || output.TopicRuleDestination == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output.TopicRuleDestination, nil
}
53 changes: 51 additions & 2 deletions internal/service/iot/sweep.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,14 @@ func init() {
})

resource.AddTestSweepers("aws_iot_topic_rule", &resource.Sweeper{
Name: "aws_iot_topic_rule",
F: sweepTopicRules,
Name: "aws_iot_topic_rule",
F: sweepTopicRules,
Dependencies: []string{"aws_iot_topic_rule_destination"},
})

resource.AddTestSweepers("aws_iot_topic_rule_destination", &resource.Sweeper{
Name: "aws_iot_topic_rule_destination",
F: sweepTopicRuleDestinations,
})
}

Expand Down Expand Up @@ -523,3 +529,46 @@ func sweepThingGroups(region string) error {

return nil
}

func sweepTopicRuleDestinations(region string) error {
client, err := sweep.SharedRegionalSweepClient(region)
if err != nil {
return fmt.Errorf("error getting client: %w", err)
}
conn := client.(*conns.AWSClient).IoTConn
input := &iot.ListTopicRuleDestinationsInput{}
sweepResources := make([]*sweep.SweepResource, 0)

err = conn.ListTopicRuleDestinationsPages(input, func(page *iot.ListTopicRuleDestinationsOutput, lastPage bool) bool {
if page == nil {
return !lastPage
}

for _, v := range page.DestinationSummaries {
r := ResourceTopicRuleDestination()
d := r.Data(nil)
d.SetId(aws.StringValue(v.Arn))

sweepResources = append(sweepResources, sweep.NewSweepResource(r, d, client))
}

return !lastPage
})

if sweep.SkipSweepError(err) {
log.Printf("[WARN] Skipping IoT Topic Rule Destination sweep for %s: %s", region, err)
return nil
}

if err != nil {
return fmt.Errorf("error listing IoT Topic Rule Destinations (%s): %w", region, err)
}

err = sweep.SweepOrchestrator(sweepResources)

if err != nil {
return fmt.Errorf("error sweeping IoT Topic Rule Destinations (%s): %w", region, err)
}

return nil
}
Loading

0 comments on commit 2f150ab

Please sign in to comment.