Skip to content

Commit

Permalink
Merge pull request #13 from DrFaust92/dag_run_id
Browse files Browse the repository at this point in the history
Support DAG Runs
  • Loading branch information
DrFaust92 authored Jan 23, 2022
2 parents 49c3263 + bfee30a commit c22e06e
Show file tree
Hide file tree
Showing 12 changed files with 333 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docs/resources/airflow_connection.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ This resource exports the following attributes:

## Import

Content can be imported using the connection key.
Connections can be imported using the connection key.

```terraform
terraform import airflow_connection.default example
Expand Down
47 changes: 47 additions & 0 deletions docs/resources/airflow_dag_run.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
---
layout: "airflow"
page_title: "Airflow: airflow_dag_run"
sidebar_current: "docs-airflow-resource-dag-run"
description: |-
Provides an Airflow dag run
---

# airflow_dag_run

Provides an Airflow dag run.

## Example Usage

```hcl
resource "airflow_dag_run" "example" {
dag_id = "example"
dag_run_id = "example"
conf = {
"example" = "example"
}
}
```

## Argument Reference

The following arguments are supported:

* `dag_id` - (Required) The DAG ID to run.
* `dag_run_id` - (Optional) The DAG Run ID. If a value is not passed, a random one will be generated based on execution date.
* `conf` - (Optional) A map describing additional configuration parameters.

## Attributes Reference

This resource exports the following attributes:

* `id` - The `dag_id:dag_run_id`.
* `state` - The DAG state.

## Import

DAG Runs can be imported using the `dag_id:dag_run_id`.

```terraform
terraform import airflow_dag_run.default example:example
```
2 changes: 1 addition & 1 deletion docs/resources/airflow_pool.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ This resource exports the following attributes:

## Import

Content can be imported using the pool name.
Pools can be imported using the pool name.

```terraform
terraform import airflow_pool.default example
Expand Down
2 changes: 1 addition & 1 deletion docs/resources/airflow_role.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ This resource exports the following attributes:

## Import

Content can be imported using the role key.
Roles can be imported using the role key.

```terraform
terraform import airflow_role.default example
Expand Down
4 changes: 2 additions & 2 deletions docs/resources/airflow_user.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ This resource exports the following attributes:
* `active` - Whether the user is active.
* `id` - The username.
* `failed_login_count` - The number of times the login failed.
* `login_count` -The login count.
* `login_count` - The login count.

## Import

Content can be imported using the user key.
Users can be imported using the user key.

```terraform
terraform import airflow_user.example example
Expand Down
2 changes: 1 addition & 1 deletion docs/resources/airflow_variable.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ This resource exports the following attributes:

## Import

Content can be imported using the variable key.
Variables can be imported using the variable key.

```terraform
terraform import airflow_variable.default example
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/apache/terraform-provider-airflow
go 1.16

require (
github.com/apache/airflow-client-go/airflow v0.0.0-20211202191715-b125451e80fd
github.com/apache/airflow-client-go/airflow v0.0.0-20220123203524-94cc2cbeaefe
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/terraform-plugin-sdk/v2 v2.10.1
golang.org/x/net v0.0.0-20211020060615-d418f374d309 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ github.com/agext/levenshtein v1.2.2 h1:0S/Yg6LYmFJ5stwQeRp6EeOcCbj7xiqQSdNelsXva
github.com/agext/levenshtein v1.2.2/go.mod h1:JEDfjyjHDjOF/1e4FlBE/PkbqA9OfWu2ki2W0IB5558=
github.com/andybalholm/crlf v0.0.0-20171020200849-670099aa064f/go.mod h1:k8feO4+kXDxro6ErPXBRTJ/ro2mf0SsFG8s7doP9kJE=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/apache/airflow-client-go/airflow v0.0.0-20211202191715-b125451e80fd h1:mrEN57zVRd8QAJkBh+MvCpbykk1eMvUe+I62+tysImc=
github.com/apache/airflow-client-go/airflow v0.0.0-20211202191715-b125451e80fd/go.mod h1:x2yDpHvQTpMyFzvwqnroMtzVgG9qFp/eJWA6kw5KTMM=
github.com/apache/airflow-client-go/airflow v0.0.0-20220123203524-94cc2cbeaefe h1:UcHH7VtRxYuPASkr6RTm0x0i8pJe9N+ihNimLCI0Yak=
github.com/apache/airflow-client-go/airflow v0.0.0-20220123203524-94cc2cbeaefe/go.mod h1:x2yDpHvQTpMyFzvwqnroMtzVgG9qFp/eJWA6kw5KTMM=
github.com/apparentlymart/go-cidr v1.0.1 h1:NmIwLZ/KdsjIUlhf+/Np40atNXm/+lZ5txfTJ/SpF+U=
github.com/apparentlymart/go-cidr v1.0.1/go.mod h1:EBcsNrHc3zQeuaeCeCtQruQm+n9/YjEn/vI25Lg7Gwc=
github.com/apparentlymart/go-dump v0.0.0-20180507223929-23540a00eaa3/go.mod h1:oL81AME2rN47vu18xqj1S1jPIPuN7afo62yKTNn3XMM=
Expand Down
1 change: 1 addition & 0 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func AirflowProvider() *schema.Provider {
},
ResourcesMap: map[string]*schema.Resource{
"airflow_connection": resourceConnection(),
"airflow_dag_run": resourceDagRun(),
"airflow_variable": resourceVariable(),
"airflow_pool": resourcePool(),
"airflow_role": resourceRole(),
Expand Down
126 changes: 126 additions & 0 deletions resource_dag_run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package main

import (
"fmt"
"strings"

"github.com/apache/airflow-client-go/airflow"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func resourceDagRun() *schema.Resource {
return &schema.Resource{
Create: resourceDagRunCreate,
Read: resourceDagRunRead,
Delete: resourceDagRunDelete,
Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},
Schema: map[string]*schema.Schema{
"dag_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
},
"dag_run_id": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Computed: true,
},
"conf": {
Type: schema.TypeMap,
Optional: true,
ForceNew: true,
Elem: &schema.Schema{Type: schema.TypeString},
},
"state": {
Type: schema.TypeString,
Computed: true,
},
},
}
}

func resourceDagRunCreate(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient.DAGRunApi

dagId := d.Get("dag_id").(string)
// dagRunId := d.Get("dag_run_id").(string)

dagRun := *airflow.NewDAGRunWithDefaults()

if v, ok := d.GetOk("dag_run_id"); ok {
dagRun.SetDagRunId(v.(string))
}

if v, ok := d.GetOk("conf"); ok {
dagRun.SetConf(v.(map[string]interface{}))
}

res, _, err := client.PostDagRun(pcfg.AuthContext, dagId).DAGRun(dagRun).Execute()
if err != nil {
return fmt.Errorf("failed to create dagRunId `%s` from Airflow: %w", dagId, err)
}
d.SetId(fmt.Sprintf("%s:%s", dagId, *res.DagRunId.Get()))

return resourceDagRunRead(d, m)
}

func resourceDagRunRead(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient.DAGRunApi

dagId, dagRunId, err := airflowDagRunId(d.Id())
if err != nil {
return err
}

dagRun, resp, err := client.GetDagRun(pcfg.AuthContext, dagId, dagRunId).Execute()
if resp != nil && resp.StatusCode == 404 {
d.SetId("")
return nil
}
if err != nil {
return fmt.Errorf("failed to get dagRunId `%s` from Airflow: %w", d.Id(), err)
}

d.Set("dag_id", dagRun.DagId)
d.Set("dag_run_id", dagRun.DagRunId.Get())
d.Set("conf", dagRun.Conf)
d.Set("state", dagRun.State)

return nil
}

func resourceDagRunDelete(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient.DAGRunApi

dagId, dagRunId, err := airflowDagRunId(d.Id())
if err != nil {
return err
}

resp, err := client.DeleteDagRun(pcfg.AuthContext, dagId, dagRunId).Execute()
if err != nil {
return fmt.Errorf("failed to delete dagRunId `%s` from Airflow: %w", d.Id(), err)
}

if resp != nil && resp.StatusCode == 404 {
return nil
}

return nil
}

func airflowDagRunId(id string) (string, string, error) {
parts := strings.SplitN(id, ":", 2)

if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return "", "", fmt.Errorf("unexpected format of ID (%s), expected DAG-ID:DAG-RUN-ID", id)
}

return parts[0], parts[1], nil
}
Loading

0 comments on commit c22e06e

Please sign in to comment.