Skip to content

Commit

Permalink
resource/aws_msk_cluster: Support instance_type update (#17447)
Browse files Browse the repository at this point in the history
  • Loading branch information
shuheiktgw authored Apr 30, 2021
1 parent 19e8e24 commit e05fe3b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
25 changes: 24 additions & 1 deletion aws/resource_aws_msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func resourceAwsMskCluster() *schema.Resource {
"instance_type": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"security_groups": {
Type: schema.TypeList,
Expand Down Expand Up @@ -546,6 +545,30 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
}
}

if d.HasChange("broker_node_group_info.0.instance_type") {
input := &kafka.UpdateBrokerTypeInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
TargetInstanceType: aws.String(d.Get("broker_node_group_info.0.instance_type").(string)),
}

output, err := conn.UpdateBrokerType(input)

if err != nil {
return fmt.Errorf("error updating MSK Cluster (%s) broker type: %s", d.Id(), err)
}

if output == nil {
return fmt.Errorf("error updating MSK Cluster (%s) broker type: empty response", d.Id())
}

clusterOperationARN := aws.StringValue(output.ClusterOperationArn)

if err := waitForMskClusterOperation(conn, clusterOperationARN); err != nil {
return fmt.Errorf("error waiting for MSK Cluster (%s) operation (%s): %s", d.Id(), clusterOperationARN, err)
}
}

if d.HasChange("number_of_broker_nodes") {
input := &kafka.UpdateBrokerCountInput{
ClusterArn: aws.String(d.Id()),
Expand Down
57 changes: 57 additions & 0 deletions aws/resource_aws_msk_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,46 @@ func TestAccAWSMskCluster_BrokerNodeGroupInfo_EbsVolumeSize(t *testing.T) {
})
}

func TestAccAWSMskCluster_BrokerNodeGroupInfo_InstanceType(t *testing.T) {
var cluster1, cluster2 kafka.ClusterInfo
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_msk_cluster.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t); testAccPreCheckAWSMsk(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckMskClusterDestroy,
Steps: []resource.TestStep{
{
Config: testAccMskClusterConfigBrokerNodeGroupInfoInstanceType(rName, "kafka.t3.small"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster1),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.instance_type", "kafka.t3.small"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"bootstrap_brokers", // API may mutate ordering and selection of brokers to return
"bootstrap_brokers_tls", // API may mutate ordering and selection of brokers to return
},
},
{
Config: testAccMskClusterConfigBrokerNodeGroupInfoInstanceType(rName, "kafka.m5.large"),
Check: resource.ComposeTestCheckFunc(
testAccCheckMskClusterExists(resourceName, &cluster2),
testAccCheckMskClusterNotRecreated(&cluster1, &cluster2),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.#", "1"),
resource.TestCheckResourceAttr(resourceName, "broker_node_group_info.0.instance_type", "kafka.m5.large"),
),
},
},
})
}

func TestAccAWSMskCluster_ClientAuthentication_Sasl_Scram(t *testing.T) {
var cluster1, cluster2 kafka.ClusterInfo
rName := acctest.RandomWithPrefix("tf-acc-test")
Expand Down Expand Up @@ -962,6 +1002,23 @@ resource "aws_msk_cluster" "test" {
`, rName, ebsVolumeSize)
}

func testAccMskClusterConfigBrokerNodeGroupInfoInstanceType(rName string, t string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.2.1"
number_of_broker_nodes = 3
broker_node_group_info {
client_subnets = [aws_subnet.example_subnet_az1.id, aws_subnet.example_subnet_az2.id, aws_subnet.example_subnet_az3.id]
ebs_volume_size = 10
instance_type = %[2]q
security_groups = [aws_security_group.example_sg.id]
}
}
`, rName, t)
}

func testAccMskClusterConfigClientAuthenticationTlsCertificateAuthorityArns(rName string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
resource "aws_acmpca_certificate_authority" "test" {
Expand Down

0 comments on commit e05fe3b

Please sign in to comment.