Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update: new data source aws_msk_bootstrap_brokers #32484

Merged
merged 37 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
fae3d16
update: aws msk bootstrap brokers
kaykhan Jul 12, 2023
bf1bbfb
update: aws msk bootstrap brokers
kaykhan Jul 12, 2023
7228ac1
update: aws msk bootstrap brokers
kaykhan Jul 12, 2023
bd4161e
update: add test file for aws msk bootstrap brokers
kaykhan Oct 20, 2023
c73f997
update: msk bootstrap broker fix test
kaykhan Oct 20, 2023
2dc7315
update: msk bootstrap broker fix lint
kaykhan Oct 20, 2023
64e2309
update(docs): msk bootstrap brokers
kaykhan Oct 20, 2023
87b4c67
Merge branch 'main' into HEAD
ewbankkit Jan 8, 2024
fc4a4dd
Add CHANGELOG entry.
ewbankkit Jan 8, 2024
2fda7cc
Fix 'TestAccKafkaBootstrapBrokersDataSource_basic'.
ewbankkit Jan 8, 2024
b29569f
kafka: Add and use 'findBootstrapBrokersByARN'.
ewbankkit Jan 8, 2024
8fcc5a5
d/aws_msk_bootstrap_brokers: Attribute names match 'aws_msk_cluster'.
ewbankkit Jan 9, 2024
5d3dba4
testAccBootstrapBrokersDataSourceConfig_basic: Use 'aws_msk_cluster'.
ewbankkit Jan 9, 2024
0ca09ca
Acceptance test output:
ewbankkit Jan 9, 2024
78e1ff2
kafka: AWS SDK for Go v2 exclusively.
ewbankkit Jan 9, 2024
f66eac6
Run 'make gen'.
ewbankkit Jan 9, 2024
8c08ba6
r/aws_msk_vpc_connection: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
3e20ef8
d/aws_msk_vpc_connection: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
b7af48d
r/aws_msk_configuration: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
b1be4af
d/aws_msk_configuration: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
cfeddde
d/aws_msk_kafka_version: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
fb2643e
r/aws_msk_scram_secret_association: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
1ff05af
r/aws_msk_replicator: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
90fc101
r/aws_msk_cluster: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
0785751
d/aws_msk_cluster: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
c73db12
d/aws_msk_broker_nodes: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
a34a2cf
d/aws_msk_bootstrap_brokers: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
2a134fc
r/aws_msk_serverless_cluster: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
84c5e97
kafka: Migrate sweepers to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
8e26453
r/aws_msk_cluster_policy: Migrate to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
c9ec3a9
kafka: Migrate acceptance tests to AWS SDK for Go v2.
ewbankkit Jan 9, 2024
f35b174
Fix semgrep 'ci.kafka-in-func-name'.
ewbankkit Jan 9, 2024
b2465e7
Fix semgrep 'ci.msk-in-func-name'.
ewbankkit Jan 9, 2024
26aee42
Fix 'findKafkaVersion'.
ewbankkit Jan 10, 2024
656f9b6
Acceptance test output:
ewbankkit Jan 10, 2024
662f867
Fix 'operation error Kafka: DeleteConfiguration ... BadRequestExcepti…
ewbankkit Jan 10, 2024
a73e318
Fix 'InvalidSecretArn: The provided secret ARN is invalid' errors on …
ewbankkit Jan 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/32484.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:new-data-source
aws_msk_bootstrap_brokers
```
5 changes: 0 additions & 5 deletions internal/conns/awsclient_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 96 additions & 0 deletions internal/service/kafka/bootstrap_brokers_data_source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package kafka

import (
"context"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/errs/sdkdiag"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

// @SDKDataSource("aws_msk_bootstrap_brokers", name="Bootstrap Brokers")
func dataSourceBootstrapBrokers() *schema.Resource {
return &schema.Resource{
ReadWithoutTimeout: dataSourceBootstrapBrokersRead,

Schema: map[string]*schema.Schema{
"bootstrap_brokers": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_public_sasl_iam": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_public_sasl_scram": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_public_tls": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_sasl_iam": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_sasl_scram": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_tls": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_vpc_connectivity_sasl_iam": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_vpc_connectivity_sasl_scram": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_vpc_connectivity_tls": {
Type: schema.TypeString,
Computed: true,
},
"cluster_arn": {
Type: schema.TypeString,
Required: true,
ValidateFunc: verify.ValidARN,
},
},
}
}

func dataSourceBootstrapBrokersRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).KafkaClient(ctx)

clusterARN := d.Get("cluster_arn").(string)
output, err := findBootstrapBrokersByARN(ctx, conn, clusterARN)

if err != nil {
return sdkdiag.AppendErrorf(diags, "reading MSK Cluster (%s) bootstrap brokers: %s", clusterARN, err)
}

d.SetId(clusterARN)
d.Set("bootstrap_brokers", SortEndpointsString(aws.ToString(output.BootstrapBrokerString)))
d.Set("bootstrap_brokers_public_sasl_iam", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringPublicSaslIam)))
d.Set("bootstrap_brokers_public_sasl_scram", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringPublicSaslScram)))
d.Set("bootstrap_brokers_public_tls", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringPublicTls)))
d.Set("bootstrap_brokers_sasl_iam", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringSaslIam)))
d.Set("bootstrap_brokers_sasl_scram", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringSaslScram)))
d.Set("bootstrap_brokers_tls", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringTls)))
d.Set("bootstrap_brokers_vpc_connectivity_sasl_iam", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringVpcConnectivitySaslIam)))
d.Set("bootstrap_brokers_vpc_connectivity_sasl_scram", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringVpcConnectivitySaslScram)))
d.Set("bootstrap_brokers_vpc_connectivity_tls", SortEndpointsString(aws.ToString(output.BootstrapBrokerStringVpcConnectivityTls)))

return diags
}
74 changes: 74 additions & 0 deletions internal/service/kafka/bootstrap_brokers_data_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package kafka_test

import (
"fmt"
"testing"

sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-provider-aws/internal/acctest"
"github.com/hashicorp/terraform-provider-aws/names"
)

func TestAccKafkaBootstrapBrokersDataSource_basic(t *testing.T) {
ctx := acctest.Context(t)
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
dataSourceName := "data.aws_msk_bootstrap_brokers.test"
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t); testAccPreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, names.KafkaEndpointID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
Steps: []resource.TestStep{
{
Config: testAccBootstrapBrokersDataSourceConfig_basic(rName),
Check: resource.ComposeAggregateTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers", resourceName, "bootstrap_brokers"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_public_sasl_iam", resourceName, "bootstrap_brokers_public_sasl_iam"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_public_sasl_scram", resourceName, "bootstrap_brokers_public_sasl_scram"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_public_tls", resourceName, "bootstrap_brokers_public_tls"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_sasl_iam", resourceName, "bootstrap_brokers_sasl_iam"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_sasl_scram", resourceName, "bootstrap_brokers_sasl_scram"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_tls", resourceName, "bootstrap_brokers_tls"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_vpc_connectivity_sasl_iam", resourceName, "bootstrap_brokers_vpc_connectivity_sasl_iam"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_vpc_connectivity_sasl_scram", resourceName, "bootstrap_brokers_vpc_connectivity_sasl_scram"),
resource.TestCheckResourceAttrPair(dataSourceName, "bootstrap_brokers_vpc_connectivity_tls", resourceName, "bootstrap_brokers_vpc_connectivity_tls"),
),
},
},
})
}

func testAccBootstrapBrokersDataSourceConfig_basic(rName string) string {
return acctest.ConfigCompose(testAccClusterConfig_base(rName), fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.8.1"
number_of_broker_nodes = 3

broker_node_group_info {
client_subnets = aws_subnet.test[*].id
instance_type = "kafka.t3.small"
security_groups = [aws_security_group.test.id]

storage_info {
ebs_storage_info {
volume_size = 10
}
}
}

tags = {
Name = %[1]q
}
}

data "aws_msk_bootstrap_brokers" "test" {
cluster_arn = aws_msk_cluster.test.arn
}
`, rName))
}
59 changes: 27 additions & 32 deletions internal/service/kafka/broker_nodes_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (
"context"
"sort"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kafka"
"github.com/aws/aws-sdk-go-v2/service/kafka/types"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/errs/sdkdiag"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

// @SDKDataSource("aws_msk_broker_nodes")
func DataSourceBrokerNodes() *schema.Resource {
// @SDKDataSource("aws_msk_broker_nodes", name="Broker Nodes")
func dataSourceBrokerNodes() *schema.Resource {
return &schema.Resource{
ReadWithoutTimeout: dataSourceBrokerNodesRead,

Expand All @@ -27,7 +28,6 @@ func DataSourceBrokerNodes() *schema.Resource {
Required: true,
ValidateFunc: verify.ValidARN,
},

"node_info_list": {
Type: schema.TypeList,
Computed: true,
Expand Down Expand Up @@ -67,53 +67,48 @@ func DataSourceBrokerNodes() *schema.Resource {

func dataSourceBrokerNodesRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).KafkaConn(ctx)
conn := meta.(*conns.AWSClient).KafkaClient(ctx)

clusterARN := d.Get("cluster_arn").(string)
input := &kafka.ListNodesInput{
ClusterArn: aws.String(clusterARN),
}
var nodeInfos []*kafka.NodeInfo
var nodeInfos []types.NodeInfo

pages := kafka.NewListNodesPaginator(conn, input)
for pages.HasMorePages() {
page, err := pages.NextPage(ctx)

err := conn.ListNodesPagesWithContext(ctx, input, func(page *kafka.ListNodesOutput, lastPage bool) bool {
if page == nil {
return !lastPage
if err != nil {
return sdkdiag.AppendErrorf(diags, "listing MSK Cluster (%s) Broker Nodes: %s", clusterARN, err)
}

nodeInfos = append(nodeInfos, page.NodeInfoList...)

return !lastPage
})

if err != nil {
return sdkdiag.AppendErrorf(diags, "listing MSK Cluster (%s) Broker Nodes: %s", clusterARN, err)
}

// node list is returned unsorted sort on broker id
sort.Slice(nodeInfos, func(i, j int) bool {
iBrokerId := aws.Float64Value(nodeInfos[i].BrokerNodeInfo.BrokerId)
jBrokerId := aws.Float64Value(nodeInfos[j].BrokerNodeInfo.BrokerId)
iBrokerId := aws.ToFloat64(nodeInfos[i].BrokerNodeInfo.BrokerId)
jBrokerId := aws.ToFloat64(nodeInfos[j].BrokerNodeInfo.BrokerId)
return iBrokerId < jBrokerId
})

tfList := make([]interface{}, len(nodeInfos))

for i, apiObject := range nodeInfos {
brokerNodeInfo := apiObject.BrokerNodeInfo
tfMap := map[string]interface{}{
"attached_eni_id": aws.StringValue(brokerNodeInfo.AttachedENIId),
"broker_id": aws.Float64Value(brokerNodeInfo.BrokerId),
"client_subnet": aws.StringValue(brokerNodeInfo.ClientSubnet),
"client_vpc_ip_address": aws.StringValue(brokerNodeInfo.ClientVpcIpAddress),
"endpoints": aws.StringValueSlice(brokerNodeInfo.Endpoints),
"node_arn": aws.StringValue(apiObject.NodeARN),
tfList := []interface{}{}
for _, apiObject := range nodeInfos {
if brokerNodeInfo := apiObject.BrokerNodeInfo; brokerNodeInfo != nil {
tfMap := map[string]interface{}{
"attached_eni_id": aws.ToString(brokerNodeInfo.AttachedENIId),
"broker_id": aws.ToFloat64(brokerNodeInfo.BrokerId),
"client_subnet": aws.ToString(brokerNodeInfo.ClientSubnet),
"client_vpc_ip_address": aws.ToString(brokerNodeInfo.ClientVpcIpAddress),
"endpoints": brokerNodeInfo.Endpoints,
"node_arn": aws.ToString(apiObject.NodeARN),
}
tfList = append(tfList, tfMap)
}

tfList[i] = tfMap
}

d.SetId(clusterARN)

if err := d.Set("node_info_list", tfList); err != nil {
return sdkdiag.AppendErrorf(diags, "setting node_info_list: %s", err)
}
Expand Down
5 changes: 2 additions & 3 deletions internal/service/kafka/broker_nodes_data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"fmt"
"testing"

"github.com/aws/aws-sdk-go/service/kafka"
sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-provider-aws/internal/acctest"
"github.com/hashicorp/terraform-provider-aws/names"
)

func TestAccKafkaBrokerNodesDataSource_basic(t *testing.T) {
Expand All @@ -21,9 +21,8 @@ func TestAccKafkaBrokerNodesDataSource_basic(t *testing.T) {

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t); testAccPreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, kafka.EndpointsID),
ErrorCheck: acctest.ErrorCheck(t, names.KafkaEndpointID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccBrokerNodesDataSourceConfig_basic(rName),
Expand Down
Loading
Loading