Skip to content

Commit

Permalink
Add autoscaling policy on EMR instance groups hashicorp#713
Browse files Browse the repository at this point in the history
  • Loading branch information
darrenhaken committed Jan 10, 2018
1 parent 49dfcdc commit f0e077d
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 29 deletions.
97 changes: 68 additions & 29 deletions aws/resource_aws_emr_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ func resourceAwsEMRCluster() *schema.Resource {
Optional: true,
Default: 0,
},
"autoscaling_policy": {
Type: schema.TypeString,
Optional: true,
DiffSuppressFunc: suppressEquivalentAwsPolicyDiffs,
ValidateFunc: validateJsonString,
},
"instance_role": {
Type: schema.TypeString,
Required: true,
Expand Down Expand Up @@ -724,6 +730,13 @@ func flattenInstanceGroups(igs []*emr.InstanceGroup) []map[string]interface{} {
attrs["instance_count"] = *ig.RequestedInstanceCount
attrs["instance_role"] = *ig.InstanceGroupType
attrs["instance_type"] = *ig.InstanceType

if ig.AutoScalingPolicy != nil {
attrs["autoscaling_policy"] = *ig.AutoScalingPolicy
} else {
attrs["autoscaling_policy"] = ""
}

attrs["name"] = *ig.Name
result = append(result, attrs)
}
Expand Down Expand Up @@ -871,7 +884,7 @@ func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActi
}

func expandInstanceGroupConfigs(instanceGroupConfigs []interface{}) []*emr.InstanceGroupConfig {
configsOut := []*emr.InstanceGroupConfig{}
instanceGroupConfig := []*emr.InstanceGroupConfig{}

for _, raw := range instanceGroupConfigs {
configAttributes := raw.(map[string]interface{})
Expand All @@ -886,42 +899,68 @@ func expandInstanceGroupConfigs(instanceGroupConfigs []interface{}) []*emr.Insta
InstanceCount: aws.Int64(int64(configInstanceCount)),
}

if bidPrice, ok := configAttributes["bid_price"]; ok {
if bidPrice != "" {
config.BidPrice = aws.String(bidPrice.(string))
config.Market = aws.String("SPOT")
} else {
config.Market = aws.String("ON_DEMAND")
applyBidPrice(config, configAttributes)
applyEbsConfig(configAttributes, config)
applyAutoScalingPolicy(configAttributes, config)

instanceGroupConfig = append(instanceGroupConfig, config)
}

return instanceGroupConfig
}

func applyBidPrice(config *emr.InstanceGroupConfig, configAttributes map[string]interface{}) {
if bidPrice, ok := configAttributes["bid_price"]; ok {
if bidPrice != "" {
config.BidPrice = aws.String(bidPrice.(string))
config.Market = aws.String("SPOT")
} else {
config.Market = aws.String("ON_DEMAND")
}
}
}

func applyEbsConfig(configAttributes map[string]interface{}, config *emr.InstanceGroupConfig) {
if rawEbsConfigs, ok := configAttributes["ebs_config"]; ok {
ebsConfig := &emr.EbsConfiguration{}

ebsBlockDeviceConfigs := make([]*emr.EbsBlockDeviceConfig, 0)
for _, rawEbsConfig := range rawEbsConfigs.(*schema.Set).List() {
rawEbsConfig := rawEbsConfig.(map[string]interface{})
ebsBlockDeviceConfig := &emr.EbsBlockDeviceConfig{
VolumesPerInstance: aws.Int64(int64(rawEbsConfig["volumes_per_instance"].(int))),
VolumeSpecification: &emr.VolumeSpecification{
SizeInGB: aws.Int64(int64(rawEbsConfig["size"].(int))),
VolumeType: aws.String(rawEbsConfig["type"].(string)),
},
}
if v, ok := rawEbsConfig["iops"].(int); ok && v != 0 {
ebsBlockDeviceConfig.VolumeSpecification.Iops = aws.Int64(int64(v))
}
ebsBlockDeviceConfigs = append(ebsBlockDeviceConfigs, ebsBlockDeviceConfig)
}
ebsConfig.EbsBlockDeviceConfigs = ebsBlockDeviceConfigs

if rawEbsConfigs, ok := configAttributes["ebs_config"]; ok {
ebsConfig := &emr.EbsConfiguration{}
config.EbsConfiguration = ebsConfig
}
}

ebsBlockDeviceConfigs := make([]*emr.EbsBlockDeviceConfig, 0)
for _, rawEbsConfig := range rawEbsConfigs.(*schema.Set).List() {
rawEbsConfig := rawEbsConfig.(map[string]interface{})
ebsBlockDeviceConfig := &emr.EbsBlockDeviceConfig{
VolumesPerInstance: aws.Int64(int64(rawEbsConfig["volumes_per_instance"].(int))),
VolumeSpecification: &emr.VolumeSpecification{
SizeInGB: aws.Int64(int64(rawEbsConfig["size"].(int))),
VolumeType: aws.String(rawEbsConfig["type"].(string)),
},
}
if v, ok := rawEbsConfig["iops"].(int); ok && v != 0 {
ebsBlockDeviceConfig.VolumeSpecification.Iops = aws.Int64(int64(v))
}
ebsBlockDeviceConfigs = append(ebsBlockDeviceConfigs, ebsBlockDeviceConfig)
}
ebsConfig.EbsBlockDeviceConfigs = ebsBlockDeviceConfigs
func applyAutoScalingPolicy(configAttributes map[string]interface{}, config *emr.InstanceGroupConfig) {
if rawAutoScalingPolicy, ok := configAttributes["autoscaling_policy"]; ok {
autoScalingConfig, _ := expandAutoScalingPolicy(rawAutoScalingPolicy.(string))
config.AutoScalingPolicy = autoScalingConfig
}
}

config.EbsConfiguration = ebsConfig
}
func expandAutoScalingPolicy(rawDefinitions string) (*emr.AutoScalingPolicy, error) {
var policy *emr.AutoScalingPolicy

configsOut = append(configsOut, config)
err := json.Unmarshal([]byte(rawDefinitions), &policy)
if err != nil {
return nil, fmt.Errorf("Error decoding JSON: %s", err)
}

return configsOut
return policy, nil
}

func expandConfigures(input string) []*emr.Configuration {
Expand Down
33 changes: 33 additions & 0 deletions aws/resource_aws_emr_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,39 @@ resource "aws_emr_cluster" "tf-test-cluster" {
volumes_per_instance = 1
}
bid_price = "0.30"
autoscaling_policy = <<EOT
{
"Constraints": {
"MinCapacity": 1,
"MaxCapacity": 2
},
"Rules": [
{
"Name": "ScaleOutMemoryPercentage",
"Description": "Scale out if YARNMemoryAvailablePercentage is less than 15",
"Action": {
"SimpleScalingPolicyConfiguration": {
"AdjustmentType": "CHANGE_IN_CAPACITY",
"ScalingAdjustment": 1,
"CoolDown": 300
}
},
"Trigger": {
"CloudWatchAlarmDefinition": {
"ComparisonOperator": "LESS_THAN",
"EvaluationPeriods": 1,
"MetricName": "YARNMemoryAvailablePercentage",
"Namespace": "AWS/ElasticMapReduce",
"Period": 300,
"Statistic": "AVERAGE",
"Threshold": 15.0,
"Unit": "PERCENT"
}
}
}
]
}
EOT
},
{
instance_role = "MASTER"
Expand Down

0 comments on commit f0e077d

Please sign in to comment.