Skip to content

Commit

Permalink
Merge pull request #3117 from WhileLoop/firehose-splunk-connector
Browse files Browse the repository at this point in the history
resource/aws_kinesis_firehose: add splunk configuration
  • Loading branch information
bflad authored Feb 6, 2018
2 parents 52b37e3 + 03a1141 commit 2710d65
Show file tree
Hide file tree
Showing 3 changed files with 308 additions and 18 deletions.
167 changes: 163 additions & 4 deletions aws/resource_aws_kinesis_firehose_delivery_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,22 @@ func flattenKinesisFirehoseDeliveryStream(d *schema.ResourceData, s *firehose.De
elasticsearchConfList[0] = elasticsearchConfiguration
d.Set("elasticsearch_configuration", elasticsearchConfList)
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.ElasticsearchDestinationDescription.S3DestinationDescription))
} else if destination.SplunkDestinationDescription != nil {
d.Set("destination", "splunk")

splunkConfiguration := map[string]interface{}{
"hec_acknowledgment_timeout": *destination.SplunkDestinationDescription.HECAcknowledgmentTimeoutInSeconds,
"hec_endpoint": *destination.SplunkDestinationDescription.HECEndpoint,
"hec_endpoint_type": *destination.SplunkDestinationDescription.HECEndpointType,
"hec_token": *destination.SplunkDestinationDescription.HECToken,
"s3_backup_mode": *destination.SplunkDestinationDescription.S3BackupMode,
"retry_duration": *destination.SplunkDestinationDescription.RetryOptions.DurationInSeconds,
"cloudwatch_logging_options": flattenCloudwatchLoggingOptions(*destination.SplunkDestinationDescription.CloudWatchLoggingOptions),
}
splunkConfList := make([]map[string]interface{}, 1)
splunkConfList[0] = splunkConfiguration
d.Set("splunk_configuration", splunkConfList)
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.SplunkDestinationDescription.S3DestinationDescription))
} else if d.Get("destination").(string) == "s3" {
d.Set("destination", "s3")
d.Set("s3_configuration", flattenFirehoseS3Configuration(*destination.S3DestinationDescription))
Expand Down Expand Up @@ -404,9 +420,9 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != "s3" && value != "extended_s3" && value != "redshift" && value != "elasticsearch" {
if value != "s3" && value != "extended_s3" && value != "redshift" && value != "elasticsearch" && value != "splunk" {
errors = append(errors, fmt.Errorf(
"%q must be one of 's3', 'extended_s3', 'redshift', 'elasticsearch'", k))
"%q must be one of 's3', 'extended_s3', 'redshift', 'elasticsearch', 'splunk'", k))
}
return
},
Expand Down Expand Up @@ -653,6 +669,71 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
},
},

"splunk_configuration": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"hec_acknowledgment_timeout": {
Type: schema.TypeInt,
Optional: true,
Default: 180,
ValidateFunc: validateIntegerInRange(180, 600),
},

"hec_endpoint": {
Type: schema.TypeString,
Required: true,
},

"hec_endpoint_type": {
Type: schema.TypeString,
Optional: true,
Default: firehose.HECEndpointTypeRaw,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != firehose.HECEndpointTypeRaw && value != firehose.HECEndpointTypeEvent {
errors = append(errors, fmt.Errorf(
"%q must be one of 'Raw', 'Event'", k))
}
return
},
},

"hec_token": {
Type: schema.TypeString,
Required: true,
},

"s3_backup_mode": {
Type: schema.TypeString,
Optional: true,
Default: firehose.SplunkS3BackupModeFailedEventsOnly,
ValidateFunc: func(v interface{}, k string) (ws []string, errors []error) {
value := v.(string)
if value != firehose.SplunkS3BackupModeFailedEventsOnly && value != firehose.SplunkS3BackupModeAllEvents {
errors = append(errors, fmt.Errorf(
"%q must be one of 'FailedEventsOnly', 'AllEvents'", k))
}
return
},
},

"retry_duration": {
Type: schema.TypeInt,
Optional: true,
Default: 3600,
ValidateFunc: validateIntegerInRange(0, 7200),
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),

"processing_configuration": processingConfigurationSchema(),
},
},
},

"arn": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -1052,6 +1133,62 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest
return update, nil
}

func createSplunkConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.SplunkDestinationConfiguration, error) {
splunkRaw, ok := d.GetOk("splunk_configuration")
if !ok {
return nil, fmt.Errorf("[ERR] Error loading Splunk Configuration for Kinesis Firehose: splunk_configuration not found")
}
sl := splunkRaw.([]interface{})

splunk := sl[0].(map[string]interface{})

configuration := &firehose.SplunkDestinationConfiguration{
HECToken: aws.String(splunk["hec_token"].(string)),
HECEndpointType: aws.String(splunk["hec_endpoint_type"].(string)),
HECEndpoint: aws.String(splunk["hec_endpoint"].(string)),
HECAcknowledgmentTimeoutInSeconds: aws.Int64(int64(splunk["hec_acknowledgment_timeout"].(int))),
RetryOptions: extractSplunkRetryOptions(splunk),
S3Configuration: s3Config,
}

if _, ok := splunk["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(splunk)
}
if s3BackupMode, ok := splunk["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
}

return configuration, nil
}

func updateSplunkConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.SplunkDestinationUpdate, error) {
splunkRaw, ok := d.GetOk("splunk_configuration")
if !ok {
return nil, fmt.Errorf("[ERR] Error loading Splunk Configuration for Kinesis Firehose: splunk_configuration not found")
}
sl := splunkRaw.([]interface{})

splunk := sl[0].(map[string]interface{})

configuration := &firehose.SplunkDestinationUpdate{
HECToken: aws.String(splunk["hec_token"].(string)),
HECEndpointType: aws.String(splunk["hec_endpoint_type"].(string)),
HECEndpoint: aws.String(splunk["hec_endpoint"].(string)),
HECAcknowledgmentTimeoutInSeconds: aws.Int64(int64(splunk["hec_acknowledgment_timeout"].(int))),
RetryOptions: extractSplunkRetryOptions(splunk),
S3Update: s3Update,
}

if _, ok := splunk["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(splunk)
}
if s3BackupMode, ok := splunk["s3_backup_mode"]; ok {
configuration.S3BackupMode = aws.String(s3BackupMode.(string))
}

return configuration, nil
}

func extractBufferingHints(es map[string]interface{}) *firehose.ElasticsearchBufferingHints {
bufferingHints := &firehose.ElasticsearchBufferingHints{}

Expand Down Expand Up @@ -1085,6 +1222,16 @@ func extractRedshiftRetryOptions(redshift map[string]interface{}) *firehose.Reds
return retryOptions
}

func extractSplunkRetryOptions(splunk map[string]interface{}) *firehose.SplunkRetryOptions {
retryOptions := &firehose.SplunkRetryOptions{}

if retryDuration, ok := splunk["retry_duration"].(int); ok {
retryOptions.DurationInSeconds = aws.Int64(int64(retryDuration))
}

return retryOptions
}

func extractCopyCommandConfiguration(redshift map[string]interface{}) *firehose.CopyCommand {
cmd := &firehose.CopyCommand{
DataTableName: aws.String(redshift["data_table_name"].(string)),
Expand Down Expand Up @@ -1136,12 +1283,18 @@ func resourceAwsKinesisFirehoseDeliveryStreamCreate(d *schema.ResourceData, meta
return err
}
createInput.ElasticsearchDestinationConfiguration = esConfig
} else {
} else if d.Get("destination").(string) == "redshift" {
rc, err := createRedshiftConfig(d, s3Config)
if err != nil {
return err
}
createInput.RedshiftDestinationConfiguration = rc
} else if d.Get("destination").(string) == "splunk" {
rc, err := createSplunkConfig(d, s3Config)
if err != nil {
return err
}
createInput.SplunkDestinationConfiguration = rc
}
}

Expand Down Expand Up @@ -1258,12 +1411,18 @@ func resourceAwsKinesisFirehoseDeliveryStreamUpdate(d *schema.ResourceData, meta
return err
}
updateInput.ElasticsearchDestinationUpdate = esUpdate
} else {
} else if d.Get("destination").(string) == "redshift" {
rc, err := updateRedshiftConfig(d, s3Config)
if err != nil {
return err
}
updateInput.RedshiftDestinationUpdate = rc
} else if d.Get("destination").(string) == "splunk" {
rc, err := updateSplunkConfig(d, s3Config)
if err != nil {
return err
}
updateInput.SplunkDestinationUpdate = rc
}
}

Expand Down
Loading

0 comments on commit 2710d65

Please sign in to comment.