Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Glue workflow #10891

Merged
merged 3 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ func Provider() terraform.ResourceProvider {
"aws_glue_job": resourceAwsGlueJob(),
"aws_glue_security_configuration": resourceAwsGlueSecurityConfiguration(),
"aws_glue_trigger": resourceAwsGlueTrigger(),
"aws_glue_workflow": resourceAwsGlueWorkflow(),
"aws_guardduty_detector": resourceAwsGuardDutyDetector(),
"aws_guardduty_invite_accepter": resourceAwsGuardDutyInviteAccepter(),
"aws_guardduty_ipset": resourceAwsGuardDutyIpset(),
Expand Down
37 changes: 30 additions & 7 deletions aws/resource_aws_glue_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,15 @@ func resourceAwsGlueTrigger() *schema.Resource {
Type: schema.TypeMap,
Optional: true,
},
"crawler_name": {
Type: schema.TypeString,
Optional: true,
ConflictsWith: []string{"actions.0.job_name"},
},
"job_name": {
Type: schema.TypeString,
Required: true,
Type: schema.TypeString,
Optional: true,
ConflictsWith: []string{"actions.0.crawler_name"},
},
"timeout": {
Type: schema.TypeInt,
Expand Down Expand Up @@ -126,6 +132,11 @@ func resourceAwsGlueTrigger() *schema.Resource {
glue.TriggerTypeScheduled,
}, false),
},
"workflow_name": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
},
},
}
}
Expand Down Expand Up @@ -157,6 +168,10 @@ func resourceAwsGlueTriggerCreate(d *schema.ResourceData, meta interface{}) erro
input.StartOnCreation = aws.Bool(true)
}

if v, ok := d.GetOk("workflow_name"); ok {
input.WorkflowName = aws.String(v.(string))
}

log.Printf("[DEBUG] Creating Glue Trigger: %s", input)
_, err := conn.CreateTrigger(input)
if err != nil {
Expand Down Expand Up @@ -234,6 +249,7 @@ func resourceAwsGlueTriggerRead(d *schema.ResourceData, meta interface{}) error
d.Set("name", trigger.Name)
d.Set("schedule", trigger.Schedule)
d.Set("type", trigger.Type)
d.Set("workflow_name", trigger.WorkflowName)

return nil
}
Expand Down Expand Up @@ -365,8 +381,14 @@ func expandGlueActions(l []interface{}) []*glue.Action {
for _, mRaw := range l {
m := mRaw.(map[string]interface{})

action := &glue.Action{
JobName: aws.String(m["job_name"].(string)),
action := &glue.Action{}

if crawlerName, ok := m["crawler_name"]; ok && crawlerName != "" {
action.CrawlerName = aws.String(crawlerName.(string))
}

if jobName, ok := m["job_name"]; ok && jobName != "" {
action.JobName = aws.String(jobName.(string))
}

argumentsMap := make(map[string]string)
Expand Down Expand Up @@ -422,9 +444,10 @@ func flattenGlueActions(actions []*glue.Action) []interface{} {

for _, action := range actions {
m := map[string]interface{}{
"arguments": aws.StringValueMap(action.Arguments),
"job_name": aws.StringValue(action.JobName),
"timeout": int(aws.Int64Value(action.Timeout)),
"arguments": aws.StringValueMap(action.Arguments),
"crawler_name": aws.StringValue(action.CrawlerName),
"job_name": aws.StringValue(action.JobName),
"timeout": int(aws.Int64Value(action.Timeout)),
}
l = append(l, m)
}
Expand Down
98 changes: 97 additions & 1 deletion aws/resource_aws_glue_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func testSweepGlueTriggers(region string) error {
name := aws.StringValue(trigger.Name)

log.Printf("[INFO] Deleting Glue Trigger: %s", name)
err := deleteGlueJob(conn, name)
err := deleteGlueTrigger(conn, name)
if err != nil {
log.Printf("[ERROR] Failed to delete Glue Trigger %s: %s", name, err)
}
Expand Down Expand Up @@ -88,6 +88,40 @@ func TestAccAWSGlueTrigger_Basic(t *testing.T) {
})
}

func TestAccAWSGlueTrigger_Crawler(t *testing.T) {
var trigger glue.Trigger

rName := fmt.Sprintf("tf-acc-test-%s", acctest.RandString(5))
resourceName := "aws_glue_trigger.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSGlueTriggerDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSGlueTriggerConfig_Crawler(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAWSGlueTriggerExists(resourceName, &trigger),
resource.TestCheckResourceAttr(resourceName, "actions.#", "1"),
resource.TestCheckResourceAttr(resourceName, "actions.0.crawler_name", rName),
resource.TestCheckResourceAttr(resourceName, "description", ""),
resource.TestCheckResourceAttr(resourceName, "enabled", "true"),
resource.TestCheckResourceAttr(resourceName, "name", rName),
resource.TestCheckResourceAttr(resourceName, "predicate.#", "0"),
resource.TestCheckResourceAttr(resourceName, "schedule", ""),
resource.TestCheckResourceAttr(resourceName, "type", "ON_DEMAND"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccAWSGlueTrigger_Description(t *testing.T) {
var trigger glue.Trigger

Expand Down Expand Up @@ -239,6 +273,33 @@ func TestAccAWSGlueTrigger_Schedule(t *testing.T) {
})
}

func TestAccAWSGlueTrigger_WorkflowName(t *testing.T) {
var trigger glue.Trigger

rName := fmt.Sprintf("tf-acc-test-%s", acctest.RandString(5))
resourceName := "aws_glue_trigger.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckAWSGlueTriggerDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSGlueTriggerConfig_WorkflowName(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAWSGlueTriggerExists(resourceName, &trigger),
resource.TestCheckResourceAttr(resourceName, "workflow_name", rName),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func testAccCheckAWSGlueTriggerExists(resourceName string, trigger *glue.Trigger) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[resourceName]
Expand Down Expand Up @@ -350,6 +411,21 @@ resource "aws_glue_trigger" "test" {
`, testAccAWSGlueJobConfig_Required(rName), rName)
}

func testAccAWSGlueTriggerConfig_Crawler(rName string) string {
return fmt.Sprintf(`
%s

resource "aws_glue_trigger" "test" {
name = "%s"
type = "ON_DEMAND"

actions {
crawler_name = "${aws_glue_crawler.test.name}"
}
}
`, testAccGlueCrawlerConfig_DynamodbTarget(rName, "table1"), rName)
}

func testAccAWSGlueTriggerConfig_Predicate(rName, state string) string {
return fmt.Sprintf(`
%s
Expand Down Expand Up @@ -398,3 +474,23 @@ resource "aws_glue_trigger" "test" {
}
`, testAccAWSGlueJobConfig_Required(rName), rName, schedule)
}

func testAccAWSGlueTriggerConfig_WorkflowName(rName string) string {
return fmt.Sprintf(`
%s

resource "aws_glue_workflow" test {
name = "%s"
}

resource "aws_glue_trigger" "test" {
name = "%s"
type = "ON_DEMAND"
workflow_name = "${aws_glue_workflow.test.name}"

actions {
job_name = "${aws_glue_job.test.name}"
}
}
`, testAccAWSGlueJobConfig_Required(rName), rName, rName)
}
160 changes: 160 additions & 0 deletions aws/resource_aws_glue_workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package aws

import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/glue"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
"log"
)

func resourceAwsGlueWorkflow() *schema.Resource {
return &schema.Resource{
Create: resourceAwsGlueWorkflowCreate,
Read: resourceAwsGlueWorkflowRead,
Update: resourceAwsGlueWorkflowUpdate,
Delete: resourceAwsGlueWorkflowDelete,
Importer: &schema.ResourceImporter{
State: schema.ImportStatePassthrough,
},

Schema: map[string]*schema.Schema{
"default_run_properties": {
Type: schema.TypeMap,
Optional: true,
Elem: schema.TypeString,
},
"description": {
Type: schema.TypeString,
Optional: true,
},
"name": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateFunc: validation.NoZeroValues,
},
},
}
}

func resourceAwsGlueWorkflowCreate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).glueconn
name := d.Get("name").(string)

input := &glue.CreateWorkflowInput{
Name: aws.String(name),
}

if kv, ok := d.GetOk("default_run_properties"); ok {
defaultRunPropertiesMap := make(map[string]string)
for k, v := range kv.(map[string]interface{}) {
defaultRunPropertiesMap[k] = v.(string)
}
input.DefaultRunProperties = aws.StringMap(defaultRunPropertiesMap)
}

if v, ok := d.GetOk("description"); ok {
input.Description = aws.String(v.(string))
}

log.Printf("[DEBUG] Creating Glue Workflow: %s", input)
_, err := conn.CreateWorkflow(input)
if err != nil {
return fmt.Errorf("error creating Glue Trigger (%s): %s", name, err)
}
d.SetId(name)

return resourceAwsGlueWorkflowRead(d, meta)
}

func resourceAwsGlueWorkflowRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).glueconn

input := &glue.GetWorkflowInput{
Name: aws.String(d.Id()),
}

log.Printf("[DEBUG] Reading Glue Workflow: %s", input)
output, err := conn.GetWorkflow(input)
if err != nil {
if isAWSErr(err, glue.ErrCodeEntityNotFoundException, "") {
log.Printf("[WARN] Glue Workflow (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}
return fmt.Errorf("error reading Glue Workflow (%s): %s", d.Id(), err)
}

workflow := output.Workflow
if workflow == nil {
log.Printf("[WARN] Glue Workflow (%s) not found, removing from state", d.Id())
d.SetId("")
return nil
}

if err := d.Set("default_run_properties", aws.StringValueMap(workflow.DefaultRunProperties)); err != nil {
return fmt.Errorf("error setting default_run_properties: %s", err)
}
d.Set("description", workflow.Description)
d.Set("name", workflow.Name)

return nil
}

func resourceAwsGlueWorkflowUpdate(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).glueconn

input := &glue.UpdateWorkflowInput{
Name: aws.String(d.Get("name").(string)),
}

if kv, ok := d.GetOk("default_run_properties"); ok {
defaultRunPropertiesMap := make(map[string]string)
for k, v := range kv.(map[string]interface{}) {
defaultRunPropertiesMap[k] = v.(string)
}
input.DefaultRunProperties = aws.StringMap(defaultRunPropertiesMap)
}

if v, ok := d.GetOk("description"); ok {
input.Description = aws.String(v.(string))
}

log.Printf("[DEBUG] Updating Glue Workflow: %s", input)
_, err := conn.UpdateWorkflow(input)
if err != nil {
return fmt.Errorf("error updating Glue Workflow (%s): %s", d.Id(), err)
}

return resourceAwsGlueWorkflowRead(d, meta)
}

func resourceAwsGlueWorkflowDelete(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).glueconn

log.Printf("[DEBUG] Deleting Glue Workflow: %s", d.Id())
err := deleteWorkflow(conn, d.Id())
if err != nil {
return fmt.Errorf("error deleting Glue Workflow (%s): %s", d.Id(), err)
}

return nil
}

func deleteWorkflow(conn *glue.Glue, name string) error {
input := &glue.DeleteWorkflowInput{
Name: aws.String(name),
}

_, err := conn.DeleteWorkflow(input)
if err != nil {
if isAWSErr(err, glue.ErrCodeEntityNotFoundException, "") {
return nil
}
return err
}

return nil
}
Loading