Skip to content

Commit

Permalink
GH-12186 aws_msk_cluster: support Logging Info
Browse files Browse the repository at this point in the history
  • Loading branch information
arafsheikh committed Mar 19, 2020
1 parent 023d4f0 commit 6a09acb
Show file tree
Hide file tree
Showing 3 changed files with 473 additions and 3 deletions.
227 changes: 226 additions & 1 deletion aws/resource_aws_msk_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,83 @@ func resourceAwsMskCluster() *schema.Resource {
},
},
},
"logging_info": {
Type: schema.TypeList,
Optional: true,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"broker_logs": {
Type: schema.TypeList,
Required: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"cloudwatch_logs": {
Type: schema.TypeList,
Optional: true,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled": {
Type: schema.TypeBool,
Required: true,
},
"log_group": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
"firehose": {
Type: schema.TypeList,
Optional: true,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled": {
Type: schema.TypeBool,
Required: true,
},
"delivery_stream": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
"s3": {
Type: schema.TypeList,
Optional: true,
DiffSuppressFunc: suppressMissingOptionalConfigurationBlock,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled": {
Type: schema.TypeBool,
Required: true,
},
"bucket": {
Type: schema.TypeString,
Optional: true,
},
"prefix": {
Type: schema.TypeString,
Optional: true,
},
},
},
},
},
},
},
},
},
},
"tags": tagsSchema(),
"zookeeper_connect_string": {
Type: schema.TypeString,
Expand All @@ -274,6 +351,7 @@ func resourceAwsMskClusterCreate(d *schema.ResourceData, meta interface{}) error
KafkaVersion: aws.String(d.Get("kafka_version").(string)),
NumberOfBrokerNodes: aws.Int64(int64(d.Get("number_of_broker_nodes").(int))),
OpenMonitoring: expandMskOpenMonitoring(d.Get("open_monitoring").([]interface{})),
LoggingInfo: expandMskLoggingInfo(d.Get("logging_info").([]interface{})),
Tags: keyvaluetags.New(d.Get("tags").(map[string]interface{})).IgnoreAws().KafkaTags(),
}

Expand Down Expand Up @@ -393,6 +471,10 @@ func resourceAwsMskClusterRead(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("error setting open_monitoring: %s", err)
}

if err := d.Set("logging_info", flattenMskLoggingInfo(cluster.LoggingInfo)); err != nil {
return fmt.Errorf("error setting logging_info: %s", err)
}

d.Set("zookeeper_connect_string", aws.StringValue(cluster.ZookeeperConnectString))

return nil
Expand Down Expand Up @@ -454,12 +536,13 @@ func resourceAwsMskClusterUpdate(d *schema.ResourceData, meta interface{}) error
}
}

if d.HasChange("enhanced_monitoring") || d.HasChange("open_monitoring") {
if d.HasChange("enhanced_monitoring") || d.HasChange("open_monitoring") || d.HasChange("logging_info") {
input := &kafka.UpdateMonitoringInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
EnhancedMonitoring: aws.String(d.Get("enhanced_monitoring").(string)),
OpenMonitoring: expandMskOpenMonitoring(d.Get("open_monitoring").([]interface{})),
LoggingInfo: expandMskLoggingInfo(d.Get("logging_info").([]interface{})),
}

output, err := conn.UpdateMonitoring(input)
Expand Down Expand Up @@ -672,6 +755,82 @@ func expandMskOpenMonitoringPrometheusNodeExporter(l []interface{}) *kafka.NodeE
return nodeExporter
}

func expandMskLoggingInfo(l []interface{}) *kafka.LoggingInfo {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

loggingInfo := &kafka.LoggingInfo{
BrokerLogs: expandMskLoggingInfoBrokerLogs(m["broker_logs"].([]interface{})),
}

return loggingInfo
}

func expandMskLoggingInfoBrokerLogs(l []interface{}) *kafka.BrokerLogs {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

brokerLogs := &kafka.BrokerLogs{
CloudWatchLogs: expandMskLoggingInfoBrokerLogsCloudWatchLogs(m["cloudwatch_logs"].([]interface{})),
Firehose: expandMskLoggingInfoBrokerLogsFirehose(m["firehose"].([]interface{})),
S3: expandMskLoggingInfoBrokerLogsS3(m["s3"].([]interface{})),
}

return brokerLogs
}

func expandMskLoggingInfoBrokerLogsCloudWatchLogs(l []interface{}) *kafka.CloudWatchLogs {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

cloudWatchLogs := &kafka.CloudWatchLogs{
Enabled: aws.Bool(m["enabled"].(bool)),
LogGroup: aws.String(m["log_group"].(string)),
}

return cloudWatchLogs
}

func expandMskLoggingInfoBrokerLogsFirehose(l []interface{}) *kafka.Firehose {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

firehose := &kafka.Firehose{
Enabled: aws.Bool(m["enabled"].(bool)),
DeliveryStream: aws.String(m["delivery_stream"].(string)),
}

return firehose
}

func expandMskLoggingInfoBrokerLogsS3(l []interface{}) *kafka.S3 {
if len(l) == 0 || l[0] == nil {
return nil
}

m := l[0].(map[string]interface{})

s3 := &kafka.S3{
Enabled: aws.Bool(m["enabled"].(bool)),
Bucket: aws.String(m["bucket"].(string)),
Prefix: aws.String(m["prefix"].(string)),
}

return s3
}

func flattenMskBrokerNodeGroupInfo(b *kafka.BrokerNodeGroupInfo) []map[string]interface{} {

if b == nil {
Expand Down Expand Up @@ -804,6 +963,72 @@ func flattenMskOpenMonitoringPrometheusNodeExporter(e *kafka.NodeExporter) []map
return []map[string]interface{}{m}
}

func flattenMskLoggingInfo(e *kafka.LoggingInfo) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"broker_logs": flattenMskLoggingInfoBrokerLogs(e.BrokerLogs),
}

return []map[string]interface{}{m}
}

func flattenMskLoggingInfoBrokerLogs(e *kafka.BrokerLogs) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"cloudwatch_logs": flattenMskLoggingInfoBrokerLogsCloudWatchLogs(e.CloudWatchLogs),
"firehose": flattenMskLoggingInfoBrokerLogsFirehose(e.Firehose),
"s3": flattenMskLoggingInfoBrokerLogsS3(e.S3),
}

return []map[string]interface{}{m}
}

func flattenMskLoggingInfoBrokerLogsCloudWatchLogs(e *kafka.CloudWatchLogs) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"enabled": aws.BoolValue(e.Enabled),
"log_group": aws.StringValue(e.LogGroup),
}

return []map[string]interface{}{m}
}

func flattenMskLoggingInfoBrokerLogsFirehose(e *kafka.Firehose) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"enabled": aws.BoolValue(e.Enabled),
"delivery_stream": aws.StringValue(e.DeliveryStream),
}

return []map[string]interface{}{m}
}

func flattenMskLoggingInfoBrokerLogsS3(e *kafka.S3) []map[string]interface{} {
if e == nil {
return []map[string]interface{}{}
}

m := map[string]interface{}{
"enabled": aws.BoolValue(e.Enabled),
"bucket": aws.StringValue(e.Bucket),
"prefix": aws.StringValue(e.Prefix),
}

return []map[string]interface{}{m}
}

func resourceAwsMskClusterDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kafkaconn

Expand Down
Loading

0 comments on commit 6a09acb

Please sign in to comment.