Skip to content

Commit

Permalink
Switch to databricks_mount in the exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
alexott committed Dec 28, 2021
1 parent 9af3fc8 commit a4e8eba
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 161 deletions.
14 changes: 8 additions & 6 deletions exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"testing"
"time"
Expand Down Expand Up @@ -104,7 +105,7 @@ func TestImportingMounts(t *testing.T) {
Status: "Finished",
Results: &common.CommandResults{
ResultType: "text",
Data: `{"foo": "s3a://foo", "bar": "abfss://bar@baz.com/thing", "third": "adls://foo.bar.com/path"}
Data: `{"foo": "s3a://foo", "bar": "abfss://bar@baz.com/thing", "third": "adls://foo3.bar.com/path", "fourth":"wasbs@bar4.baz4.com/dir", "fifth":"gs://foo5"}
and some chatty messages`,
},
},
Expand Down Expand Up @@ -172,18 +173,18 @@ func TestImportingMounts(t *testing.T) {
ic.listing = "mounts"
ic.mounts = true

err := ic.Importables["databricks_aws_s3_mount"].List(ic)
err := ic.Importables["databricks_mount"].List(ic)
assert.NoError(t, err)

err = ic.Importables["databricks_azure_adls_gen2_mount"].List(ic)
err = ic.Importables["databricks_mount"].List(ic)
assert.NoError(t, err)
err = ic.Importables["databricks_azure_adls_gen2_mount"].Body(ic,
err = ic.Importables["databricks_mount"].Body(ic,
hclwrite.NewEmptyFile().Body(), ic.Scope[1])
assert.NoError(t, err)

err = ic.Importables["databricks_azure_adls_gen1_mount"].List(ic)
err = ic.Importables["databricks_mount"].List(ic)
assert.NoError(t, err)
err = ic.Importables["databricks_azure_adls_gen1_mount"].Body(ic,
err = ic.Importables["databricks_mount"].Body(ic,
hclwrite.NewEmptyFile().Body(), ic.Scope[2])
assert.NoError(t, err)

Expand Down Expand Up @@ -551,6 +552,7 @@ func TestImportingClusters(t *testing.T) {
services, _ := ic.allServicesAndListing()
ic.services = services

log.Printf("[INFO] PATH='%s'", os.Getenv("PATH"))
err := ic.Run()
assert.NoError(t, err)
})
Expand Down
279 changes: 126 additions & 153 deletions exporter/importables.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,101 @@ import (
)

var (
//adlsGen2Regex = regexp.MustCompile(`^((?:abfs|wasb)s?)://([^@]+)@([^.]+)\.(?:[^/]+)(/.*)?$`)
adlsGen2Regex = regexp.MustCompile(`^(abfss?)://([^@]+)@([^.]+)\.(?:[^/]+)(/.*)?$`)
adlsGen1Regex = regexp.MustCompile(`^(adls?)://([^.]+)\.(?:[^/]+)(/.*)?$`)
wasbsRegex = regexp.MustCompile(`^(wasbs?)://([^@]+)@([^.]+)\.(?:[^/]+)(/.*)?$`)
s3Regex = regexp.MustCompile(`^(s3a?)://([^/]+)(/.*)?$`)
gsRegex = regexp.MustCompile(`^gs://([^/]+)(/.*)?$`)
)

func generateMountBody(ic *importContext, body *hclwrite.Body, r *resource) error {
mount := ic.mountMap[r.ID]

b := body.AppendNewBlock("resource", []string{r.Resource, r.Name}).Body()
b.SetAttributeValue("name", cty.StringVal(strings.Replace(r.ID, "/mnt/", "", 1)))
if res := s3Regex.FindStringSubmatch(mount.URL); res != nil {
block := b.AppendNewBlock("s3", nil).Body()
block.SetAttributeValue("bucket_name", cty.StringVal(res[2]))
if mount.InstanceProfile != "" {
block.SetAttributeValue("instance_profile", cty.StringVal(mount.InstanceProfile))
} else if mount.ClusterID != "" {
b.SetAttributeValue("cluster_id", cty.StringVal(mount.ClusterID))
}
} else if res := gsRegex.FindStringSubmatch(mount.URL); res != nil {
block := b.AppendNewBlock("gs", nil).Body()
block.SetAttributeValue("bucket_name", cty.StringVal(res[1]))
if mount.ClusterID != "" {
b.SetAttributeValue("cluster_id", cty.StringVal(mount.ClusterID))
}
} else if res := adlsGen2Regex.FindStringSubmatch(mount.URL); res != nil {
containerName := res[2]
storageAccountName := res[3]
block := b.AppendNewBlock("abfs", nil).Body()
block.SetAttributeValue("container_name", cty.StringVal(containerName))
block.SetAttributeValue("storage_account_name", cty.StringVal(storageAccountName))
if res[4] != "" && res[4] != "/" {
block.SetAttributeValue("directory", cty.StringVal(res[4]))
}

varName := ic.regexFix("_"+storageAccountName+"_"+containerName+"_abfs", ic.nameFixes)
textStr := fmt.Sprintf(" for mounting ADLSv2 resource %s://%s@%s",
res[1], containerName, storageAccountName)

block.SetAttributeRaw("client_id", ic.variable(
"client_id"+varName, "Client ID"+textStr))
block.SetAttributeRaw("tenant_id", ic.variable(
"tenant_id"+varName, "Tenant ID"+textStr))
block.SetAttributeRaw("client_secret_scope", ic.variable(
"client_secret_scope"+varName,
"Secret scope name that stores app client secret"+textStr))
block.SetAttributeRaw("client_secret_key", ic.variable(
"client_secret_key"+varName,
"Key in secret scope that stores app client secret"+textStr))
} else if res := adlsGen1Regex.FindStringSubmatch(mount.URL); res != nil {
block := b.AppendNewBlock("adl", nil).Body()
storageResourceName := res[2]
block.SetAttributeValue("storage_resource_name", cty.StringVal(storageResourceName))
if res[3] != "" && res[3] != "/" {
block.SetAttributeValue("directory", cty.StringVal(res[3]))
}
varName := ic.regexFix("_"+storageResourceName+"_adl", ic.nameFixes)
textStr := fmt.Sprintf(" for mounting ADLSv1 resource %s://%s", res[1], storageResourceName)

block.SetAttributeRaw("client_id", ic.variable("client_id"+varName, "Client ID"+textStr))
block.SetAttributeRaw("tenant_id", ic.variable("tenant_id"+varName, "Tenant IDs"+textStr))
block.SetAttributeRaw("client_secret_scope", ic.variable(
"client_secret_scope"+varName, "Secret scope name that stores app client secret"+textStr))
block.SetAttributeRaw("client_secret_key", ic.variable(
"client_secret_key"+varName, "Key in secret scope that stores app client secret"+textStr))
} else if res := wasbsRegex.FindStringSubmatch(mount.URL); res != nil {
containerName := res[2]
storageAccountName := res[3]
block := b.AppendNewBlock("wasb", nil).Body()
block.SetAttributeValue("container_name", cty.StringVal(containerName))
block.SetAttributeValue("storage_account_name", cty.StringVal(storageAccountName))
if res[4] != "" && res[4] != "/" {
block.SetAttributeValue("directory", cty.StringVal(res[4]))
}
block.SetAttributeValue("auth_type", cty.StringVal("ACCESS_KEY"))

varName := ic.regexFix("_"+storageAccountName+"_"+containerName+"_wasb", ic.nameFixes)
textStr := fmt.Sprintf(" for mounting WASB resource %s://%s@%s",
res[1], containerName, storageAccountName)

block.SetAttributeRaw("token_secret_scope", ic.variable(
"client_secret_scope"+varName,
"Secret scope name that stores app client secret"+textStr))
block.SetAttributeRaw("token_secret_key", ic.variable(
"client_secret_key"+varName,
"Key in secret scope that stores app client secret"+textStr))
} else {
return fmt.Errorf("no matching handler for: %s", mount.URL)
}
body.AppendNewline()

return nil
}

var resourcesMap map[string]importable = map[string]importable{
"databricks_dbfs_file": {
Service: "storage",
Expand Down Expand Up @@ -560,8 +650,9 @@ var resourcesMap map[string]importable = map[string]importable{
{Path: "principal", Resource: "databricks_user", Match: "user_name"},
},
},
"databricks_aws_s3_mount": {
"databricks_mount": {
Service: "mounts",
Body: generateMountBody,
List: func(ic *importContext) error {
if !ic.mounts {
return nil
Expand All @@ -570,174 +661,56 @@ var resourcesMap map[string]importable = map[string]importable{
return err
}
for mountName, source := range ic.mountMap {
if !strings.HasPrefix(source.URL, "s3a://") {
continue
}
if !ic.MatchesName(mountName) {
continue
}
log.Printf("[INFO] Emitting databricks_aws_s3_mount: %s",
source.URL)
attrs := map[string]string{
"s3_bucket_name": strings.ReplaceAll(source.URL, "s3a://", ""),
"mount_name": mountName,
}
if source.InstanceProfile != "" {
attrs["instance_profile"] = source.InstanceProfile
ic.Emit(&resource{
Resource: "databricks_instance_profile",
ID: source.InstanceProfile,
})
} else if source.ClusterID != "" {
attrs["cluster_id"] = source.ClusterID
ic.Emit(&resource{
Resource: "databricks_cluster",
ID: source.ClusterID,
})
if strings.HasPrefix(source.URL, "s3a://") {
log.Printf("[INFO] Emitting databricks_mount: %s", source.URL)
if source.InstanceProfile != "" {
ic.Emit(&resource{
Resource: "databricks_instance_profile",
ID: source.InstanceProfile,
})
} else if source.ClusterID != "" {
ic.Emit(&resource{
Resource: "databricks_cluster",
ID: source.ClusterID,
})
}
} else if strings.HasPrefix(source.URL, "gs://") {
if source.ClusterID != "" {
ic.Emit(&resource{
Resource: "databricks_cluster",
ID: source.ClusterID,
})
}
} else if res := adlsGen2Regex.FindStringSubmatch(source.URL); res != nil {
} else if res := adlsGen1Regex.FindStringSubmatch(source.URL); res != nil {
} else if res := wasbsRegex.FindStringSubmatch(source.URL); res != nil {
} else {
log.Printf("[INFO] No matching handler for: %s", source.URL)
continue
}
log.Printf("[INFO] Emitting databricks_mount: %s", source.URL)
ic.Emit(&resource{
Resource: "databricks_mount",
ID: mountName,
Resource: "databricks_aws_s3_mount",
Data: ic.Resources["databricks_aws_s3_mount"].Data(
Data: ic.Resources["databricks_mount"].Data(
&terraform.InstanceState{
ID: mountName,
Attributes: attrs,
Attributes: map[string]string{},
}),
})

}
return nil
},
Depends: []reference{
{Path: "s3_bucket_name", Resource: "aws_s3_bucket", Match: "bucket"},
{Path: "s3_bucket_name", Resource: "aws_s3_bucket", Match: "bucket"}, // this should be changed somehow & avoid clashes with GCS bucket_name
{Path: "instance_profile", Resource: "databricks_instance_profile"},
{Path: "cluster_id", Resource: "databricks_cluster"},
},
},
"databricks_azure_adls_gen2_mount": {
Service: "mounts",
List: func(ic *importContext) error {
if !ic.mounts {
return nil
}
if err := ic.refreshMounts(); err != nil {
return err
}
for mountName, source := range ic.mountMap {
if res := adlsGen2Regex.FindStringSubmatch(source.URL); res == nil {
log.Printf("[DEBUG] skipping %s mounted at %s", source, mountName)
continue
}
if !ic.MatchesName(mountName) {
continue
}
ic.Emit(&resource{
Resource: "databricks_azure_adls_gen2_mount",
ID: mountName,
Data: ic.Resources["databricks_azure_adls_gen2_mount"].Data(
&terraform.InstanceState{
ID: mountName,
// don't open another command/context
Attributes: map[string]string{},
}),
})
}
return nil
},
Body: func(ic *importContext, body *hclwrite.Body, r *resource) error {
b := body.AppendNewBlock("resource", []string{r.Resource, r.Name}).Body()

mount := ic.mountMap[r.ID]
res := adlsGen2Regex.FindStringSubmatch(mount.URL)
if res == nil {
return fmt.Errorf("can't extract ADLSv2 information from string '%s'", mount)
}
containerName := res[2]
storageAccountName := res[3]
b.SetAttributeValue("container_name", cty.StringVal(containerName))
b.SetAttributeValue("storage_account_name", cty.StringVal(storageAccountName))
if res[4] != "" && res[4] != "/" {
b.SetAttributeValue("directory", cty.StringVal(res[4]))
}
b.SetAttributeValue("mount_name", cty.StringVal(strings.Replace(r.ID, "/mnt/", "", 1)))

varName := "_" + storageAccountName + "_" + containerName
textStr := fmt.Sprintf(" for mounting ADLSv2 resource %s://%s@%s",
res[1], containerName, storageAccountName)

b.SetAttributeRaw("client_id", ic.variable(
"client_id"+varName, "Client ID"+textStr))
b.SetAttributeRaw("tenant_id", ic.variable(
"tenant_id"+varName, "Tenant ID"+textStr))
b.SetAttributeRaw("client_secret_scope", ic.variable(
"client_secret_scope"+varName,
"Secret scope name that stores app client secret"+textStr))
b.SetAttributeRaw("client_secret_key", ic.variable(
"client_secret_key"+varName,
"Key in secret scope that stores app client secret"+textStr))

return nil
},
Depends: []reference{
{Path: "storage_account_name", Resource: "azurerm_storage_account", Match: "name"},
{Path: "storage_account_name", Resource: "azurerm_storage_account", Match: "name"}, // similarly for WASBS vs ABFSS
{Path: "container_name", Resource: "azurerm_storage_container", Match: "name"},
},
},
"databricks_azure_adls_gen1_mount": {
Service: "mounts",
List: func(ic *importContext) error {
if !ic.mounts {
return nil
}
if err := ic.refreshMounts(); err != nil {
return err
}
for mountName, source := range ic.mountMap {
if res := adlsGen1Regex.FindStringSubmatch(source.URL); res == nil {
continue
}
if !ic.MatchesName(mountName) {
continue
}
ic.Emit(&resource{
Resource: "databricks_azure_adls_gen1_mount",
ID: mountName,
Data: ic.Resources["databricks_azure_adls_gen2_mount"].Data(
&terraform.InstanceState{
ID: mountName,
// don't open another command/context
Attributes: map[string]string{},
}),
})
}
return nil
},
Body: func(ic *importContext, body *hclwrite.Body, r *resource) error {
b := body.AppendNewBlock("resource", []string{r.Resource, r.Name}).Body()

mount := ic.mountMap[r.ID]
res := adlsGen1Regex.FindStringSubmatch(mount.URL)
if res == nil {
return fmt.Errorf("can't extract ADLSv1 information from string '%s'", mount)
}
storageResourceName := res[2]
b.SetAttributeValue("storage_resource_name", cty.StringVal(storageResourceName))
if res[3] != "" && res[3] != "/" {
b.SetAttributeValue("directory", cty.StringVal(res[3]))
}
b.SetAttributeValue("mount_name", cty.StringVal(strings.Replace(r.ID, "/mnt/", "", 1)))
varName := "_" + storageResourceName
textStr := fmt.Sprintf(" for mounting ADLSv1 resource %s://%s", res[1], storageResourceName)

b.SetAttributeRaw("client_id", ic.variable("client_id"+varName, "Client ID"+textStr))
b.SetAttributeRaw("tenant_id", ic.variable("tenant_id"+varName, "Tenant IDs"+textStr))
b.SetAttributeRaw("client_secret_scope", ic.variable(
"client_secret_scope"+varName, "Secret scope name that stores app client secret"+textStr))
b.SetAttributeRaw("client_secret_key", ic.variable(
"client_secret_key"+varName, "Key in secret scope that stores app client secret"+textStr))

return nil
},
Depends: []reference{
{Path: "storage_resource_name", Resource: "azurerm_data_lake_store", Match: "name"},
},
},
Expand Down
4 changes: 2 additions & 2 deletions exporter/importables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,11 +519,11 @@ func TestAwsS3MountProfile(t *testing.T) {
URL: "s3a://def",
InstanceProfile: "bcd",
}
err := resourcesMap["databricks_aws_s3_mount"].List(ic)
err := resourcesMap["databricks_mount"].List(ic)
assert.NoError(t, err)
assert.Len(t, ic.testEmits, 2)
assert.True(t, ic.testEmits["databricks_instance_profile[<unknown>] (id: bcd)"])
assert.True(t, ic.testEmits["databricks_aws_s3_mount[<unknown>] (id: /mnt/abc)"])
assert.True(t, ic.testEmits["databricks_mount[<unknown>] (id: /mnt/abc)"])
}

func TestGlobalInitScriptNameFromId(t *testing.T) {
Expand Down

0 comments on commit a4e8eba

Please sign in to comment.