Skip to content

Commit

Permalink
Merge pull request #2 from DrFaust92/sdk-v2
Browse files Browse the repository at this point in the history
sdk v2
  • Loading branch information
DrFaust92 authored Oct 29, 2021
2 parents e576e27 + 1fdf693 commit cb243c1
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 112 deletions.
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
TEST?=$$(go list ./...)

build:
go build -o terraform-provider-airflow

testacc:
TF_ACC=1 go test $(TEST) -v $(TESTARGS) -timeout 5m
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.16
require (
github.com/apache/airflow-client-go/airflow v0.0.0-20210618063701-c4bfdb8caedb
github.com/golang/protobuf v1.5.2 // indirect
github.com/hashicorp/terraform-plugin-sdk v1.10.0
github.com/hashicorp/terraform-plugin-sdk/v2 v2.4.3
golang.org/x/net v0.0.0-20211020060615-d418f374d309 // indirect
golang.org/x/oauth2 v0.0.0-20211028175245-ba495a64dcb5 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
155 changes: 99 additions & 56 deletions go.sum

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package main

import (
"github.com/hashicorp/terraform-plugin-sdk/plugin"
"github.com/hashicorp/terraform-plugin-sdk/terraform"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/plugin"
)

func main() {
plugin.Serve(&plugin.ServeOpts{
ProviderFunc: func() terraform.ResourceProvider {
return Provider()
ProviderFunc: func() *schema.Provider {
return AirflowProvider()
},
})
}
80 changes: 33 additions & 47 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,74 +6,60 @@ import (
"net/url"

"github.com/apache/airflow-client-go/airflow"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

type ProviderConfig struct {
ApiClient *airflow.APIClient
AuthContext context.Context
}

func Provider() *schema.Provider {
func AirflowProvider() *schema.Provider {
return &schema.Provider{
Schema: map[string]*schema.Schema{
"base_endpoint": {
Type: schema.TypeString,
Required: true,
},
// username and password are used for API basic auth
"username": {
Type: schema.TypeString,
Optional: true,
Description: "The username to use for API basic authentication",
},
"password": {
Type: schema.TypeString,
Optional: true,
Description: "The password to use for API basic authentication",
Required: true,
DefaultFunc: schema.EnvDefaultFunc("AIRFLOW_BASE_ENDPOINT", nil),
},
"oauth2_token": {
Type: schema.TypeString,
Optional: true,
Description: "The username to use for API basic authentication",
Description: "The oauth to use for API authentication",
DefaultFunc: schema.EnvDefaultFunc("AIRFLOW_OAUTH2_TOKEN", nil),
},
},
ResourcesMap: map[string]*schema.Resource{
"airflow_variable": resourceVariable(),
"airflow_variable": resourceVariable(),
"airflow_connection": resourceConnection(),
},
ConfigureFunc: func(p *schema.ResourceData) (interface{}, error) {
endpoint := p.Get("base_endpoint").(string)
u, err := url.Parse(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid base_endpoint: %w", err)
}

// basePath := path.Join(u.Path + "/api/experimental")
// log.Printf("[DEBUG] Using API prefix: %s", basePath)
ConfigureFunc: providerConfigure,
}
}

authCtx := context.Background()
func providerConfigure(d *schema.ResourceData) (interface{}, error) {
endpoint := d.Get("base_endpoint").(string)
u, err := url.Parse(endpoint)
if err != nil {
return nil, fmt.Errorf("invalid base_endpoint: %w", err)
}

// if username, ok := p.GetOk("username"); ok {
// var password interface{}
// if password, ok = p.GetOk("password"); !ok {
// return nil, fmt.Errorf("Found username for basic auth, but password not specified.")
// }
// log.Printf("[DEBUG] Using API Basic Auth")
authCtx := context.Background()

// cred := airflow.ContextOAuth2.BasicAuth{
// UserName: username.(string),
// Password: password.(string),
// }
authCtx = context.WithValue(authCtx, airflow.ContextOAuth2, p.Get("oauth2_token").(string))
// }
authCtx = context.WithValue(authCtx, airflow.ContextAccessToken, d.Get("oauth2_token"))

return ProviderConfig{
ApiClient: airflow.NewAPIClient(&airflow.Configuration{
Scheme: u.Scheme,
Host: u.Host,
}),
AuthContext: authCtx,
}, nil
},
}
return ProviderConfig{
ApiClient: airflow.NewAPIClient(&airflow.Configuration{
Scheme: u.Scheme,
Host: u.Host,
Debug: true,
Servers: airflow.ServerConfigurations{
{
URL: "/api/v1",
Description: "Apache Airflow Stable API.",
},
},
}),
AuthContext: authCtx,
}, nil
}
37 changes: 37 additions & 0 deletions provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package main

import (
"os"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

var testAccProviders map[string]*schema.Provider
var testAccProvider *schema.Provider

func init() {
testAccProvider = AirflowProvider()
testAccProviders = map[string]*schema.Provider{
"airflow": testAccProvider,
}
}

func TestProvider(t *testing.T) {
if err := AirflowProvider().InternalValidate(); err != nil {
t.Fatalf("err: %s", err)
}
}

func TestProvider_impl(t *testing.T) {
var _ *schema.Provider = AirflowProvider()
}

func testAccPreCheck(t *testing.T) {
if v := os.Getenv("AIRFLOW_OAUTH2_TOKEN"); v == "" {
t.Fatal("AIRFLOW_OAUTH2_TOKEN must be set for acceptance tests")
}
if v := os.Getenv("AIRFLOW_BASE_ENDPOINT"); v == "" {
t.Fatal("AIRFLOW_BASE_ENDPOINT must be set for acceptance tests")
}
}
121 changes: 121 additions & 0 deletions resource_connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package main

import (
"fmt"

"github.com/apache/airflow-client-go/airflow"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func resourceConnectionCreate(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient
connId := d.Get("connection_id").(string)
connType := d.Get("conn_type").(string)

conn := airflow.Connection{
ConnectionId: &connId,
ConnType: &connType,
}
connApi := client.ConnectionApi

_, _, err := connApi.PostConnection(pcfg.AuthContext).Connection(conn).Execute()
if err != nil {
return fmt.Errorf("failed to create connection `%s` from Airflow: %w", connId, err)
}
d.SetId(connId)
return resourceConnectionRead(d, m)
}

func resourceConnectionRead(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient
connId := d.Get("connection_id").(string)
connection, resp, err := client.ConnectionApi.GetConnection(pcfg.AuthContext, connId).Execute()
if resp != nil && resp.StatusCode == 404 {
d.SetId("")
return nil
}
if err != nil {
return fmt.Errorf("failed to get connection `%s` from Airflow: %w", connId, err)
}

d.Set("connection_id", connection.ConnectionId)
d.Set("conn_type", connection.ConnType)
return nil
}

func resourceConnectionUpdate(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient
connId := d.Get("connection_id").(string)
connType := d.Get("conn_type").(string)

conn := airflow.Connection{
ConnectionId: &connId,
ConnType: &connType,
}
_, _, err := client.ConnectionApi.PatchConnection(pcfg.AuthContext, connId).Connection(conn).Execute()
if err != nil {
return fmt.Errorf("failed to update connection `%s` from Airflow: %w", connId, err)
}
d.SetId(connId)
return resourceConnectionRead(d, m)
}

func resourceConnectionDelete(d *schema.ResourceData, m interface{}) error {
pcfg := m.(ProviderConfig)
client := pcfg.ApiClient
connId := d.Get("connection_id").(string)
_, err := client.ConnectionApi.DeleteConnection(pcfg.AuthContext, connId).Execute()
if err != nil {
return fmt.Errorf("failed to delete connection `%s` from Airflow: %w", connId, err)
}

return nil
}

func resourceConnection() *schema.Resource {
return &schema.Resource{
Create: resourceConnectionCreate,
Read: resourceConnectionRead,
Update: resourceConnectionUpdate,
Delete: resourceConnectionDelete,

Schema: map[string]*schema.Schema{
"connection_id": {
Type: schema.TypeString,
Required: true,
},
"conn_type": {
Type: schema.TypeString,
Required: true,
},
"host": {
Type: schema.TypeString,
Optional: true,
},
"login": {
Type: schema.TypeString,
Optional: true,
},
"schema": {
Type: schema.TypeString,
Optional: true,
},
"port": {
Type: schema.TypeInt,
Optional: true,
},
"password": {
Type: schema.TypeString,
Optional: true,
Sensitive: true,
},
"extra": {
Type: schema.TypeString,
Optional: true,
},
},
}
}
38 changes: 38 additions & 0 deletions resource_connection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccAirflowConnection_basic(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")

resourceName := "airflow_connection.test"
resource.Test(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAirflowConnectionConfigBasic(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttr(resourceName, "connection_id", rName),
resource.TestCheckResourceAttr(resourceName, "conn_type", rName),
),
},
},
})
}

func testAccAirflowConnectionConfigBasic(rName string) string {
return fmt.Sprintf(`
resource "airflow_connection" "test" {
connection_id = %[1]q
conn_type = %[1]q
}
`, rName)
}
12 changes: 8 additions & 4 deletions resource_variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"

"github.com/apache/airflow-client-go/airflow"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func resourceVariableCreate(d *schema.ResourceData, m interface{}) error {
Expand All @@ -19,7 +19,7 @@ func resourceVariableCreate(d *schema.ResourceData, m interface{}) error {
Value: &val,
}).Execute()
if err != nil {
return err
return fmt.Errorf("failed to create variable `%s` from Airflow: %w", key, err)
}
d.SetId(key)
return resourceVariableRead(d, m)
Expand Down Expand Up @@ -53,7 +53,7 @@ func resourceVariableUpdate(d *schema.ResourceData, m interface{}) error {
Value: &val,
}).Execute()
if err != nil {
return err
return fmt.Errorf("failed to update variable `%s` from Airflow: %w", key, err)
}
d.SetId(key)
return resourceVariableRead(d, m)
Expand All @@ -64,7 +64,11 @@ func resourceVariableDelete(d *schema.ResourceData, m interface{}) error {
client := pcfg.ApiClient
key := d.Get("key").(string)
_, err := client.VariableApi.DeleteVariable(pcfg.AuthContext, key).Execute()
return err
if err != nil {
return fmt.Errorf("failed to delete variable `%s` from Airflow: %w", key, err)
}

return nil
}

func resourceVariable() *schema.Resource {
Expand Down
Loading

0 comments on commit cb243c1

Please sign in to comment.