Skip to content

Commit

Permalink
Merge pull request #19404 from hcourse-nydig/f-aws_msk_cluster-iam-cl…
Browse files Browse the repository at this point in the history
…ient-auth

Add support for MSK IAM client authentication
  • Loading branch information
YakDriver authored May 26, 2021
2 parents 203e17a + 9b6a2e2 commit 825c48a
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 161 deletions.
7 changes: 7 additions & 0 deletions .changelog/19404.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:enhancement
data-source/aws_msk_cluster: Add `bootstrap_brokers_sasl_iam` attribute
```

```release-note:enhancement
resource/aws_msk_cluster: Add `bootstrap_brokers_sasl_iam` argument
```
5 changes: 5 additions & 0 deletions aws/data_source_aws_msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func dataSourceAwsMskCluster() *schema.Resource {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_sasl_iam": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_sasl_scram": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -104,6 +108,7 @@ func dataSourceAwsMskClusterRead(d *schema.ResourceData, meta interface{}) error

d.Set("arn", cluster.ClusterArn)
d.Set("bootstrap_brokers", sortMskClusterEndpoints(aws.StringValue(bootstrapBrokersOutput.BootstrapBrokerString)))
d.Set("bootstrap_brokers_sasl_iam", sortMskClusterEndpoints(aws.StringValue(bootstrapBrokersOutput.BootstrapBrokerStringSaslIam)))
d.Set("bootstrap_brokers_sasl_scram", sortMskClusterEndpoints(aws.StringValue(bootstrapBrokersOutput.BootstrapBrokerStringSaslScram)))
d.Set("bootstrap_brokers_tls", sortMskClusterEndpoints(aws.StringValue(bootstrapBrokersOutput.BootstrapBrokerStringTls)))
d.Set("cluster_name", cluster.ClusterName)
Expand Down
4 changes: 2 additions & 2 deletions aws/data_source_aws_msk_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestAccAWSMskClusterDataSource_Name(t *testing.T) {
}

func testAccMskClusterDataSourceConfigName(rName string) string {
return testAccMskClusterBaseConfig() + fmt.Sprintf(`
return composeConfig(testAccMskClusterBaseConfig(rName), fmt.Sprintf(`
resource "aws_msk_cluster" "test" {
cluster_name = %[1]q
kafka_version = "2.2.1"
Expand All @@ -60,5 +60,5 @@ resource "aws_msk_cluster" "test" {
data "aws_msk_cluster" "test" {
cluster_name = aws_msk_cluster.test.cluster_name
}
`, rName)
`, rName))
}
11 changes: 11 additions & 0 deletions aws/internal/service/msk/waiter/waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package waiter

import (
"time"
)

const (
ClusterCreateTimeout = 120 * time.Minute
ClusterUpdateTimeout = 120 * time.Minute
ClusterDeleteTimeout = 120 * time.Minute
)
104 changes: 71 additions & 33 deletions aws/resource_aws_msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"log"
"sort"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kafka"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/keyvaluetags"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/msk/waiter"
)

func resourceAwsMskCluster() *schema.Resource {
Expand All @@ -26,6 +26,13 @@ func resourceAwsMskCluster() *schema.Resource {
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Timeouts: &schema.ResourceTimeout{
Create: schema.DefaultTimeout(waiter.ClusterCreateTimeout),
Update: schema.DefaultTimeout(waiter.ClusterUpdateTimeout),
Delete: schema.DefaultTimeout(waiter.ClusterDeleteTimeout),
},

CustomizeDiff: customdiff.Sequence(
customdiff.ForceNewIfChange("kafka_version", func(_ context.Context, old, new, meta interface{}) bool {
return new.(string) < old.(string)
Expand All @@ -41,6 +48,10 @@ func resourceAwsMskCluster() *schema.Resource {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_sasl_iam": {
Type: schema.TypeString,
Computed: true,
},
"bootstrap_brokers_sasl_scram": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -112,6 +123,11 @@ func resourceAwsMskCluster() *schema.Resource {
Optional: true,
ForceNew: true,
},
"iam": {
Type: schema.TypeBool,
Optional: true,
ForceNew: true,
},
},
},
ConflictsWith: []string{"client_authentication.0.tls"},
Expand Down Expand Up @@ -399,7 +415,7 @@ func waitForMskClusterCreation(conn *kafka.Kafka, arn string) error {
input := &kafka.DescribeClusterInput{
ClusterArn: aws.String(arn),
}
err := resource.Retry(60*time.Minute, func() *resource.RetryError {
err := resource.Retry(waiter.ClusterCreateTimeout, func() *resource.RetryError {
out, err := conn.DescribeCluster(input)
if err != nil {
return resource.NonRetryableError(err)
Expand Down Expand Up @@ -462,6 +478,7 @@ func resourceAwsMskClusterRead(d *schema.ResourceData, meta interface{}) error {

d.Set("arn", cluster.ClusterArn)
d.Set("bootstrap_brokers", sortMskClusterEndpoints(aws.StringValue(brokerOut.BootstrapBrokerString)))
d.Set("bootstrap_brokers_sasl_iam", sortMskClusterEndpoints(aws.StringValue(brokerOut.BootstrapBrokerStringSaslIam)))
d.Set("bootstrap_brokers_sasl_scram", sortMskClusterEndpoints(aws.StringValue(brokerOut.BootstrapBrokerStringSaslScram)))
d.Set("bootstrap_brokers_tls", sortMskClusterEndpoints(aws.StringValue(brokerOut.BootstrapBrokerStringTls)))

Expand Down Expand Up @@ -680,7 +697,25 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
}

return resourceAwsMskClusterRead(d, meta)
}

func resourceAwsMskClusterDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kafkaconn

log.Printf("[DEBUG] Deleting MSK cluster: %q", d.Id())
_, err := conn.DeleteCluster(&kafka.DeleteClusterInput{
ClusterArn: aws.String(d.Id()),
})
if err != nil {
if isAWSErr(err, kafka.ErrCodeNotFoundException, "") {
return nil
}
return fmt.Errorf("failed deleting MSK cluster %q: %s", d.Id(), err)
}

log.Printf("[DEBUG] Waiting for MSK cluster %q to be deleted", d.Id())

return resourceAwsMskClusterDeleteWaiter(conn, d.Id())
}

func expandMskClusterBrokerNodeGroupInfo(l []interface{}) *kafka.BrokerNodeGroupInfo {
Expand Down Expand Up @@ -712,9 +747,14 @@ func expandMskClusterClientAuthentication(l []interface{}) *kafka.ClientAuthenti

m := l[0].(map[string]interface{})

ca := &kafka.ClientAuthentication{
Sasl: expandMskClusterScram(m["sasl"].([]interface{})),
Tls: expandMskClusterTls(m["tls"].([]interface{})),
ca := &kafka.ClientAuthentication{}

if v, ok := m["sasl"].([]interface{}); ok {
ca.Sasl = expandMskClusterSasl(v)
}

if v, ok := m["tls"].([]interface{}); ok {
ca.Tls = expandMskClusterTls(v)
}

return ca
Expand Down Expand Up @@ -770,7 +810,7 @@ func expandMskClusterEncryptionInTransit(l []interface{}) *kafka.EncryptionInTra
return eit
}

func expandMskClusterScram(l []interface{}) *kafka.Sasl {
func expandMskClusterSasl(l []interface{}) *kafka.Sasl {
if len(l) == 0 || l[0] == nil {
return nil
}
Expand All @@ -780,10 +820,18 @@ func expandMskClusterScram(l []interface{}) *kafka.Sasl {
return nil
}

sasl := &kafka.Sasl{
Scram: &kafka.Scram{
Enabled: aws.Bool(tfMap["scram"].(bool)),
},
sasl := &kafka.Sasl{}

if v, ok := tfMap["scram"].(bool); ok {
sasl.Scram = &kafka.Scram{
Enabled: aws.Bool(v),
}
}

if v, ok := tfMap["iam"].(bool); ok {
sasl.Iam = &kafka.Iam{
Enabled: aws.Bool(v),
}
}

return sasl
Expand Down Expand Up @@ -1014,20 +1062,29 @@ func flattenMskSasl(sasl *kafka.Sasl) []map[string]interface{} {
}

m := map[string]interface{}{
"scram": flattenMskScram(sasl.Scram),
"scram": flattenMskSaslScram(sasl.Scram),
"iam": flattenMskSaslIam(sasl.Iam),
}

return []map[string]interface{}{m}
}

func flattenMskScram(scram *kafka.Scram) bool {
func flattenMskSaslScram(scram *kafka.Scram) bool {
if scram == nil {
return false
}

return aws.BoolValue(scram.Enabled)
}

func flattenMskSaslIam(iam *kafka.Iam) bool {
if iam == nil {
return false
}

return aws.BoolValue(iam.Enabled)
}

func flattenMskTls(tls *kafka.Tls) []map[string]interface{} {
if tls == nil {
return []map[string]interface{}{}
Expand Down Expand Up @@ -1155,30 +1212,11 @@ func flattenMskLoggingInfoBrokerLogsS3(e *kafka.S3) []map[string]interface{} {
return []map[string]interface{}{m}
}

func resourceAwsMskClusterDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kafkaconn

log.Printf("[DEBUG] Deleting MSK cluster: %q", d.Id())
_, err := conn.DeleteCluster(&kafka.DeleteClusterInput{
ClusterArn: aws.String(d.Id()),
})
if err != nil {
if isAWSErr(err, kafka.ErrCodeNotFoundException, "") {
return nil
}
return fmt.Errorf("failed deleting MSK cluster %q: %s", d.Id(), err)
}

log.Printf("[DEBUG] Waiting for MSK cluster %q to be deleted", d.Id())

return resourceAwsMskClusterDeleteWaiter(conn, d.Id())
}

func resourceAwsMskClusterDeleteWaiter(conn *kafka.Kafka, arn string) error {
input := &kafka.DescribeClusterInput{
ClusterArn: aws.String(arn),
}
err := resource.Retry(60*time.Minute, func() *resource.RetryError {
err := resource.Retry(waiter.ClusterDeleteTimeout, func() *resource.RetryError {
_, err := conn.DescribeCluster(input)

if err != nil {
Expand Down Expand Up @@ -1235,7 +1273,7 @@ func waitForMskClusterOperation(conn *kafka.Kafka, clusterOperationARN string) e
Pending: []string{"PENDING", "UPDATE_IN_PROGRESS"},
Target: []string{"UPDATE_COMPLETE"},
Refresh: mskClusterOperationRefreshFunc(conn, clusterOperationARN),
Timeout: 2 * time.Hour,
Timeout: waiter.ClusterUpdateTimeout,
}

log.Printf("[DEBUG] Waiting for MSK Cluster Operation (%s) completion", clusterOperationARN)
Expand Down
Loading

0 comments on commit 825c48a

Please sign in to comment.