From 6a5a20b88e2e58abc11030c79adb332b56aef7ce Mon Sep 17 00:00:00 2001 From: Brian Flad Date: Thu, 8 Mar 2018 08:21:01 -0500 Subject: [PATCH] resource/aws_emr_cluster: Add step support --- aws/resource_aws_emr_cluster.go | 214 ++++++++++++++++++++-- aws/resource_aws_emr_cluster_test.go | 218 ++++++++++++++++++++++- website/docs/r/emr_cluster.html.markdown | 46 +++++ 3 files changed, 459 insertions(+), 19 deletions(-) diff --git a/aws/resource_aws_emr_cluster.go b/aws/resource_aws_emr_cluster.go index 2beedb6744f..1c12cf94501 100644 --- a/aws/resource_aws_emr_cluster.go +++ b/aws/resource_aws_emr_cluster.go @@ -264,6 +264,63 @@ func resourceAwsEMRCluster() *schema.Resource { }, }, }, + "step": { + Type: schema.TypeList, + Optional: true, + Computed: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "action_on_failure": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validation.StringInSlice([]string{ + emr.ActionOnFailureCancelAndWait, + emr.ActionOnFailureContinue, + emr.ActionOnFailureTerminateCluster, + emr.ActionOnFailureTerminateJobFlow, + }, false), + }, + "hadoop_jar_step": { + Type: schema.TypeList, + MaxItems: 1, + Required: true, + ForceNew: true, + Elem: &schema.Resource{ + Schema: map[string]*schema.Schema{ + "args": { + Type: schema.TypeList, + Optional: true, + ForceNew: true, + Elem: &schema.Schema{Type: schema.TypeString}, + }, + "jar": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + "main_class": { + Type: schema.TypeString, + Optional: true, + ForceNew: true, + }, + "properties": { + Type: schema.TypeMap, + Optional: true, + ForceNew: true, + }, + }, + }, + }, + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + }, + }, + }, "tags": tagsSchema(), "configurations": { Type: schema.TypeString, @@ -441,6 +498,10 @@ func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error bootstrapActions := v.(*schema.Set).List() params.BootstrapActions = expandBootstrapActions(bootstrapActions) } + if v, ok := d.GetOk("step"); ok { + steps := v.([]interface{}) + params.Steps = expandEmrStepConfigs(steps) + } if v, ok := d.GetOk("tags"); ok { tagsIn := v.(map[string]interface{}) params.Tags = expandTags(tagsIn) @@ -482,8 +543,14 @@ func resourceAwsEMRClusterCreate(d *schema.ResourceData, meta interface{}) error log.Println("[INFO] Waiting for EMR Cluster to be available") stateConf := &resource.StateChangeConf{ - Pending: []string{"STARTING", "BOOTSTRAPPING"}, - Target: []string{"WAITING", "RUNNING"}, + Pending: []string{ + emr.ClusterStateBootstrapping, + emr.ClusterStateStarting, + }, + Target: []string{ + emr.ClusterStateRunning, + emr.ClusterStateWaiting, + }, Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), Timeout: 75 * time.Minute, MinTimeout: 10 * time.Second, @@ -519,19 +586,15 @@ func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error { cluster := resp.Cluster if cluster.Status != nil { - if *cluster.Status.State == "TERMINATED" { - log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED already", d.Id()) - d.SetId("") - return nil - } + state := aws.StringValue(cluster.Status.State) - if *cluster.Status.State == "TERMINATED_WITH_ERRORS" { - log.Printf("[DEBUG] EMR Cluster (%s) was TERMINATED_WITH_ERRORS already", d.Id()) + if state == emr.ClusterStateTerminated || state == emr.ClusterStateTerminatedWithErrors { + log.Printf("[WARN] EMR Cluster (%s) was %s already, removing from state", d.Id(), state) d.SetId("") return nil } - d.Set("cluster_state", cluster.Status.State) + d.Set("cluster_state", state) } instanceGroups, err := fetchAllEMRInstanceGroups(emrconn, d.Id()) @@ -591,6 +654,24 @@ func resourceAwsEMRClusterRead(d *schema.ResourceData, meta interface{}) error { log.Printf("[WARN] Error setting Bootstrap Actions: %s", err) } + var stepSummaries []*emr.StepSummary + listStepsInput := &emr.ListStepsInput{ + ClusterId: aws.String(d.Id()), + } + err = emrconn.ListStepsPages(listStepsInput, func(page *emr.ListStepsOutput, lastPage bool) bool { + // ListSteps returns steps in reverse order (newest first) + for _, step := range page.Steps { + stepSummaries = append([]*emr.StepSummary{step}, stepSummaries...) + } + return !lastPage + }) + if err != nil { + return fmt.Errorf("error listing steps: %s", err) + } + if err := d.Set("step", flattenEmrStepSummaries(stepSummaries)); err != nil { + return fmt.Errorf("error setting step: %s", err) + } + return nil } @@ -633,8 +714,14 @@ func resourceAwsEMRClusterUpdate(d *schema.ResourceData, meta interface{}) error log.Println("[INFO] Waiting for EMR Cluster to be available") stateConf := &resource.StateChangeConf{ - Pending: []string{"STARTING", "BOOTSTRAPPING"}, - Target: []string{"WAITING", "RUNNING"}, + Pending: []string{ + emr.ClusterStateBootstrapping, + emr.ClusterStateStarting, + }, + Target: []string{ + emr.ClusterStateRunning, + emr.ClusterStateWaiting, + }, Refresh: resourceAwsEMRClusterStateRefreshFunc(d, meta), Timeout: 40 * time.Minute, MinTimeout: 10 * time.Second, @@ -718,7 +805,7 @@ func resourceAwsEMRClusterDelete(d *schema.ResourceData, meta interface{}) error var terminated []string for j, i := range resp.Instances { if i.Status != nil { - if *i.Status.State == "TERMINATED" { + if aws.StringValue(i.Status.State) == emr.InstanceStateTerminated { terminated = append(terminated, *i.Ec2InstanceId) } } else { @@ -833,6 +920,49 @@ func flattenEmrKerberosAttributes(d *schema.ResourceData, kerberosAttributes *em return l } +func flattenEmrHadoopStepConfig(config *emr.HadoopStepConfig) map[string]interface{} { + if config == nil { + return nil + } + + m := map[string]interface{}{ + "args": aws.StringValueSlice(config.Args), + "jar": aws.StringValue(config.Jar), + "main_class": aws.StringValue(config.MainClass), + "properties": aws.StringValueMap(config.Properties), + } + + return m +} + +func flattenEmrStepSummaries(stepSummaries []*emr.StepSummary) []map[string]interface{} { + l := make([]map[string]interface{}, 0) + + if len(stepSummaries) == 0 { + return l + } + + for _, stepSummary := range stepSummaries { + l = append(l, flattenEmrStepSummary(stepSummary)) + } + + return l +} + +func flattenEmrStepSummary(stepSummary *emr.StepSummary) map[string]interface{} { + if stepSummary == nil { + return nil + } + + m := map[string]interface{}{ + "action_on_failure": aws.StringValue(stepSummary.ActionOnFailure), + "hadoop_jar_step": []map[string]interface{}{flattenEmrHadoopStepConfig(stepSummary.Config)}, + "name": aws.StringValue(stepSummary.Name), + } + + return m +} + func flattenInstanceGroups(igs []*emr.InstanceGroup) []map[string]interface{} { result := make([]map[string]interface{}, 0) @@ -1014,6 +1144,40 @@ func expandBootstrapActions(bootstrapActions []interface{}) []*emr.BootstrapActi return actionsOut } +func expandEmrHadoopJarStepConfig(m map[string]interface{}) *emr.HadoopJarStepConfig { + hadoopJarStepConfig := &emr.HadoopJarStepConfig{ + Jar: aws.String(m["jar"].(string)), + } + + if v, ok := m["args"]; ok { + hadoopJarStepConfig.Args = expandStringList(v.([]interface{})) + } + + if v, ok := m["main_class"]; ok { + hadoopJarStepConfig.MainClass = aws.String(v.(string)) + } + + if v, ok := m["properties"]; ok { + hadoopJarStepConfig.Properties = expandEmrKeyValues(v.(map[string]interface{})) + } + + return hadoopJarStepConfig +} + +func expandEmrKeyValues(m map[string]interface{}) []*emr.KeyValue { + keyValues := make([]*emr.KeyValue, 0) + + for k, v := range m { + keyValue := &emr.KeyValue{ + Key: aws.String(k), + Value: aws.String(v.(string)), + } + keyValues = append(keyValues, keyValue) + } + + return keyValues +} + func expandEmrKerberosAttributes(m map[string]interface{}) *emr.KerberosAttributes { kerberosAttributes := &emr.KerberosAttributes{ KdcAdminPassword: aws.String(m["kdc_admin_password"].(string)), @@ -1031,6 +1195,30 @@ func expandEmrKerberosAttributes(m map[string]interface{}) *emr.KerberosAttribut return kerberosAttributes } +func expandEmrStepConfig(m map[string]interface{}) *emr.StepConfig { + hadoopJarStepList := m["hadoop_jar_step"].([]interface{}) + hadoopJarStepMap := hadoopJarStepList[0].(map[string]interface{}) + + stepConfig := &emr.StepConfig{ + ActionOnFailure: aws.String(m["action_on_failure"].(string)), + HadoopJarStep: expandEmrHadoopJarStepConfig(hadoopJarStepMap), + Name: aws.String(m["name"].(string)), + } + + return stepConfig +} + +func expandEmrStepConfigs(l []interface{}) []*emr.StepConfig { + stepConfigs := []*emr.StepConfig{} + + for _, raw := range l { + m := raw.(map[string]interface{}) + stepConfigs = append(stepConfigs, expandEmrStepConfig(m)) + } + + return stepConfigs +} + func expandInstanceGroupConfigs(instanceGroupConfigs []interface{}) []*emr.InstanceGroupConfig { instanceGroupConfig := []*emr.InstanceGroupConfig{} diff --git a/aws/resource_aws_emr_cluster_test.go b/aws/resource_aws_emr_cluster_test.go index a80b0a2de9e..fc43a821e00 100644 --- a/aws/resource_aws_emr_cluster_test.go +++ b/aws/resource_aws_emr_cluster_test.go @@ -27,6 +27,7 @@ func TestAccAWSEMRCluster_basic(t *testing.T) { Check: resource.ComposeTestCheckFunc( testAccCheckAWSEmrClusterExists("aws_emr_cluster.tf-test-cluster", &cluster), resource.TestCheckResourceAttr("aws_emr_cluster.tf-test-cluster", "scale_down_behavior", "TERMINATE_AT_TASK_COMPLETION"), + resource.TestCheckResourceAttr("aws_emr_cluster.tf-test-cluster", "step.#", "0"), ), }, }, @@ -92,6 +93,64 @@ func TestAccAWSEMRCluster_security_config(t *testing.T) { }) } +func TestAccAWSEMRCluster_Step_Basic(t *testing.T) { + var cluster emr.Cluster + rInt := acctest.RandInt() + resourceName := "aws_emr_cluster.tf-test-cluster" + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSEmrDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSEmrClusterConfig_Step_Single(rInt), + Check: resource.ComposeTestCheckFunc( + testAccCheckAWSEmrClusterExists(resourceName, &cluster), + resource.TestCheckResourceAttr(resourceName, "step.#", "1"), + resource.TestCheckResourceAttr(resourceName, "step.0.action_on_failure", "TERMINATE_CLUSTER"), + resource.TestCheckResourceAttr(resourceName, "step.0.hadoop_jar_step.0.args.0", "state-pusher-script"), + resource.TestCheckResourceAttr(resourceName, "step.0.hadoop_jar_step.0.jar", "command-runner.jar"), + resource.TestCheckResourceAttr(resourceName, "step.0.hadoop_jar_step.0.main_class", ""), + resource.TestCheckResourceAttr(resourceName, "step.0.hadoop_jar_step.0.properties.%", "0"), + resource.TestCheckResourceAttr(resourceName, "step.0.name", "Setup Hadoop Debugging"), + ), + }, + }, + }) +} + +func TestAccAWSEMRCluster_Step_Multiple(t *testing.T) { + var cluster emr.Cluster + rInt := acctest.RandInt() + resourceName := "aws_emr_cluster.tf-test-cluster" + + resource.Test(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSEmrDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSEmrClusterConfig_Step_Multiple(rInt), + Check: resource.ComposeTestCheckFunc( + testAccCheckAWSEmrClusterExists(resourceName, &cluster), + resource.TestCheckResourceAttr(resourceName, "step.#", "2"), + resource.TestCheckResourceAttr(resourceName, "step.0.action_on_failure", "TERMINATE_CLUSTER"), + resource.TestCheckResourceAttr(resourceName, "step.0.hadoop_jar_step.0.args.0", "state-pusher-script"), + resource.TestCheckResourceAttr(resourceName, "step.0.hadoop_jar_step.0.jar", "command-runner.jar"), + resource.TestCheckResourceAttr(resourceName, "step.0.name", "Setup Hadoop Debugging"), + resource.TestCheckResourceAttr(resourceName, "step.1.action_on_failure", "CONTINUE"), + resource.TestCheckResourceAttr(resourceName, "step.1.hadoop_jar_step.0.args.0", "spark-example"), + resource.TestCheckResourceAttr(resourceName, "step.1.hadoop_jar_step.0.args.1", "SparkPi"), + resource.TestCheckResourceAttr(resourceName, "step.1.hadoop_jar_step.0.args.2", "10"), + resource.TestCheckResourceAttr(resourceName, "step.1.hadoop_jar_step.0.jar", "command-runner.jar"), + resource.TestCheckResourceAttr(resourceName, "step.1.name", "Spark Step"), + ), + }, + }, + }) +} + func TestAccAWSEMRCluster_bootstrap_ordering(t *testing.T) { var cluster emr.Cluster rName := acctest.RandomWithPrefix("tf-emr-bootstrap") @@ -387,16 +446,17 @@ func testAccCheckAWSEmrClusterExists(n string, v *emr.Cluster) resource.TestChec return fmt.Errorf("EMR error: %v", err) } - if describe.Cluster != nil && - *describe.Cluster.Id != rs.Primary.ID { - return fmt.Errorf("EMR cluser not found") + if describe.Cluster == nil || *describe.Cluster.Id != rs.Primary.ID { + return fmt.Errorf("EMR cluster %q not found", rs.Primary.ID) } *v = *describe.Cluster - if describe.Cluster != nil && - *describe.Cluster.Status.State != "WAITING" { - return fmt.Errorf("EMR cluser is not up yet") + if describe.Cluster.Status != nil { + state := aws.StringValue(describe.Cluster.Status.State) + if state != emr.ClusterStateRunning && state != emr.ClusterStateWaiting { + return fmt.Errorf("EMR cluster %q is not RUNNING or WAITING, currently: %s", rs.Primary.ID, state) + } } return nil @@ -1484,6 +1544,152 @@ POLICY `, r, r, r, r, r, r, r, r, r, r) } +const testAccAWSEmrClusterConfig_Step_DebugLoggingStep = ` + # Example from: https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-debugging.html + step { + action_on_failure = "TERMINATE_CLUSTER" + name = "Setup Hadoop Debugging" + + hadoop_jar_step { + jar = "command-runner.jar" + args = ["state-pusher-script"] + } + } +` + +const testAccAWSEmrClusterConfig_Step_SparkStep = ` + # Example from: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-submit-step.html + step { + action_on_failure = "CONTINUE" + name = "Spark Step" + + hadoop_jar_step { + jar = "command-runner.jar" + args = ["spark-example", "SparkPi", "10"] + } + } +` + +func testAccAWSEmrClusterConfig_Step_Multiple(rInt int) string { + stepConfig := testAccAWSEmrClusterConfig_Step_DebugLoggingStep + testAccAWSEmrClusterConfig_Step_SparkStep + return testAccAWSEmrClusterConfig_Step(rInt, stepConfig) +} + +func testAccAWSEmrClusterConfig_Step_Single(rInt int) string { + return testAccAWSEmrClusterConfig_Step(rInt, testAccAWSEmrClusterConfig_Step_DebugLoggingStep) +} + +func testAccAWSEmrClusterConfig_Step(rInt int, stepConfig string) string { + return fmt.Sprintf(` +provider "aws" { + region = "us-west-2" +} + +data "aws_availability_zones" "available" {} + +resource "aws_emr_cluster" "tf-test-cluster" { + applications = ["Spark"] + core_instance_count = 1 + core_instance_type = "c4.large" + keep_job_flow_alive_when_no_steps = true + log_uri = "s3://${aws_s3_bucket.test.bucket}/" + master_instance_type = "c4.large" + name = "emr-test-%[1]d" + release_label = "emr-5.12.0" + service_role = "EMR_DefaultRole" + termination_protection = false + + ec2_attributes { + emr_managed_master_security_group = "${aws_security_group.allow_all.id}" + emr_managed_slave_security_group = "${aws_security_group.allow_all.id}" + instance_profile = "EMR_EC2_DefaultRole" + subnet_id = "${aws_subnet.main.0.id}" + } + +%[2]s + + depends_on = ["aws_main_route_table_association.a"] +} + +resource "aws_s3_bucket" "test" { + bucket = "tf-acc-test-%[1]d" + force_destroy = true +} + +resource "aws_security_group" "allow_all" { + name = "allow_all_%[1]d" + description = "Allow all inbound traffic" + vpc_id = "${aws_vpc.main.id}" + + ingress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } + + depends_on = ["aws_subnet.main"] + + lifecycle { + ignore_changes = ["ingress", "egress"] + } + + tags { + Name = "emr_test" + } +} + +resource "aws_vpc" "main" { + cidr_block = "10.0.0.0/16" + enable_dns_hostnames = true + + tags { + Name = "terraform-testacc-emr-cluster-step" + } +} + +resource "aws_subnet" "main" { + availability_zone = "${element(data.aws_availability_zones.available.names, count.index)}" + count = 2 + cidr_block = "10.0.${count.index}.0/24" + vpc_id = "${aws_vpc.main.id}" + + tags { + Name = "terraform-testacc-emr-cluster-step" + } +} + +resource "aws_internet_gateway" "gw" { + vpc_id = "${aws_vpc.main.id}" + + tags { + Name = "terraform-testacc-emr-cluster-step" + } +} + +resource "aws_route_table" "r" { + vpc_id = "${aws_vpc.main.id}" + + route { + cidr_block = "0.0.0.0/0" + gateway_id = "${aws_internet_gateway.gw.id}" + } +} + +resource "aws_main_route_table_association" "a" { + route_table_id = "${aws_route_table.r.id}" + vpc_id = "${aws_vpc.main.id}" +} +`, rInt, stepConfig) +} + func testAccAWSEmrClusterConfigInstanceGroups(r int) string { return fmt.Sprintf(` resource "aws_emr_cluster" "tf-test-cluster" { diff --git a/website/docs/r/emr_cluster.html.markdown b/website/docs/r/emr_cluster.html.markdown index 2aa39ea86ec..574d50badd6 100644 --- a/website/docs/r/emr_cluster.html.markdown +++ b/website/docs/r/emr_cluster.html.markdown @@ -105,6 +105,34 @@ Started](https://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr guide for more information on these IAM roles. There is also a fully-bootable example Terraform configuration at the bottom of this page. +### Enable Debug Logging + +[Debug logging in EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-debugging.html) +is implemented as a step. It is highly recommended to utilize the +[lifecycle configuration block](/docs/configuration/resources.html) with `ignore_changes` if other +steps are being managed outside of Terraform. + +```hcl +resource "aws_emr_cluster" "example" { + # ... other configuration ... + + step { + action = "TERMINATE_CLUSTER" + name = "Setup Hadoop Debugging" + + hadoop_jar_step { + jar = "command-runner.jar" + args = ["state-pusher-script"] + } + } + + # Optional: ignore outside changes to running cluster steps + lifecycle { + ignore_changes = ["step"] + } +} +``` + ## Argument Reference The following arguments are supported: @@ -133,6 +161,7 @@ flow. Defined below * `configurations` - (Optional) List of configurations supplied for the EMR cluster you are creating * `visible_to_all_users` - (Optional) Whether the job flow is visible to all IAM users of the AWS account associated with the job flow. Default `true` * `autoscaling_role` - (Optional) An IAM role for automatic scaling policies. The IAM role provides permissions that the automatic scaling feature requires to launch and terminate EC2 instances in an instance group. +* `step` - (Optional) List of steps to run when creating the cluster. Defined below. It is highly recommended to utilize the [lifecycle configuration block](/docs/configuration/resources.html) with `ignore_changes` if other steps are being managed outside of Terraform. * `tags` - (Optional) list of tags to apply to the EMR Cluster @@ -204,6 +233,23 @@ Attributes for the EBS volumes attached to each EC2 instance in the `instance_gr * `path` - (Required) Location of the script to run during a bootstrap action. Can be either a location in Amazon S3 or on a local file system * `args` - (Optional) List of command line arguments to pass to the bootstrap action script +## step + +Attributes for step configuration + +* `action_on_failure` - (Required) The action to take if the step fails. Valid values: `TERMINATE_JOB_FLOW`, `TERMINATE_CLUSTER`, `CANCEL_AND_WAIT`, and `CONTINUE` +* `hadoop_jar_step` - (Required) The JAR file used for the step. Defined below. +* `name` - (Required) The name of the step. + +### hadoop_jar_step + +Attributes for Hadoop job step configuration + +* `args` - (Optional) List of command line arguments passed to the JAR file's main function when executed. +* `jar` - (Required) Path to a JAR file run during the step. +* `main_class` - (Optional) Name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file. +* `properties` - (Optional) Key-Value map of Java properties that are set when the step runs. You can use these properties to pass key value pairs to your main function. + ## Attributes Reference The following attributes are exported: