Skip to content

Commit

Permalink
Merge pull request #37566 from nickmhankins/glue-metadata
Browse files Browse the repository at this point in the history
Add Appflow Glue Integration and allow setting pathPrefixHierarchy
  • Loading branch information
johnsonaj authored Jul 24, 2024
2 parents c082eaf + a6fd23b commit 3a29801
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .changelog/37566.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:enhancement
resource/aws_appflow_flow: Add `prefix_hierarchy` attribute to `destination_flow_config.s3.s3_output_format_config`
```

```release-note:enhancement
resource/aws_appflow_flow: Add `metadata_catalog_config` attribute
```
137 changes: 136 additions & 1 deletion internal/service/appflow/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,15 @@ func resourceFlow() *schema.Resource {
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"prefix_hierarchy": {
Type: schema.TypeList,
Optional: true,
Computed: true,
Elem: &schema.Schema{
Type: schema.TypeString,
ValidateDiagFunc: enum.Validate[types.PathPrefix](),
},
},
"prefix_format": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -632,6 +641,15 @@ func resourceFlow() *schema.Resource {
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"prefix_hierarchy": {
Type: schema.TypeList,
Optional: true,
Computed: true,
Elem: &schema.Schema{
Type: schema.TypeString,
ValidateDiagFunc: enum.Validate[types.PathPrefix](),
},
},
"prefix_format": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -1259,6 +1277,38 @@ func resourceFlow() *schema.Resource {
},
},
},
"metadata_catalog_config": {
Type: schema.TypeList,
Optional: true,
Computed: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"glue_data_catalog": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
names.AttrDatabaseName: {
Type: schema.TypeString,
Required: true,
},
names.AttrRoleARN: {
Type: schema.TypeString,
Required: true,
ValidateDiagFunc: validation.ToDiagFunc(verify.ValidARN),
},
"table_prefix": {
Type: schema.TypeString,
Required: true,
},
},
},
},
},
},
},
},

CustomizeDiff: verify.SetTagsDiff,
Expand All @@ -1280,6 +1330,10 @@ func resourceFlowCreate(ctx context.Context, d *schema.ResourceData, meta interf
TriggerConfig: expandTriggerConfig(d.Get("trigger_config").([]interface{})[0].(map[string]interface{})),
}

if v, ok := d.GetOk("metadata_catalog_config"); ok {
input.MetadataCatalogConfig = expandMetadataCatalogConfig(v.([]any))
}

if v, ok := d.GetOk(names.AttrDescription); ok {
input.Description = aws.String(v.(string))
}
Expand Down Expand Up @@ -1348,6 +1402,14 @@ func resourceFlowRead(ctx context.Context, d *schema.ResourceData, meta interfac
d.Set("trigger_config", nil)
}

if output.MetadataCatalogConfig != nil {
if err := d.Set("metadata_catalog_config", flattenMetadataCatalogConfig(output.MetadataCatalogConfig)); err != nil {
return sdkdiag.AppendErrorf(diags, "setting metadata_catalog_config: %s", err)
}
} else {
d.Set("metadata_catalog_config", nil)
}

setTagsOut(ctx, output.Tags)

return diags
Expand All @@ -1367,6 +1429,10 @@ func resourceFlowUpdate(ctx context.Context, d *schema.ResourceData, meta interf
TriggerConfig: expandTriggerConfig(d.Get("trigger_config").([]interface{})[0].(map[string]interface{})),
}

if v, ok := d.GetOk("metadata_catalog_config"); ok {
input.MetadataCatalogConfig = expandMetadataCatalogConfig(v.([]any))
}

// always send description when updating a task
if v, ok := d.GetOk(names.AttrDescription); ok {
input.Description = aws.String(v.(string))
Expand Down Expand Up @@ -1529,6 +1595,10 @@ func expandPrefixConfig(tfMap map[string]interface{}) *types.PrefixConfig {
a.PrefixType = types.PrefixType(v)
}

if v, ok := tfMap["prefix_hierarchy"].([]interface{}); ok && len(v) > 0 && v[0] != nil {
a.PathPrefixHierarchy = flex.ExpandStringyValueList[types.PathPrefix](v)
}

return a
}

Expand Down Expand Up @@ -2602,6 +2672,70 @@ func expandScheduledTriggerProperties(tfMap map[string]interface{}) *types.Sched
return a
}

func expandMetadataCatalogConfig(tfList []any) *types.MetadataCatalogConfig {
if len(tfList) == 0 {
return nil
}

m := tfList[0].(map[string]any)

a := &types.MetadataCatalogConfig{}

if v, ok := m["glue_data_catalog"].([]any); ok && len(v) > 0 && v[0] != nil {
a.GlueDataCatalog = expandGlueDataCatalog(v[0].(map[string]any))
}

return a
}

func expandGlueDataCatalog(tfMap map[string]interface{}) *types.GlueDataCatalogConfig {
if tfMap == nil {
return nil
}

a := &types.GlueDataCatalogConfig{}

if v, ok := tfMap[names.AttrDatabaseName].(string); ok && v != "" {
a.DatabaseName = aws.String(v)
}

if v, ok := tfMap[names.AttrRoleARN].(string); ok && v != "" {
a.RoleArn = aws.String(v)
}

if v, ok := tfMap["table_prefix"].(string); ok && v != "" {
a.TablePrefix = aws.String(v)
}

return a
}

func flattenMetadataCatalogConfig(in *types.MetadataCatalogConfig) []any {
if in == nil {
return nil
}

m := map[string]any{
"glue_data_catalog": flattenGlueDataCatalog(in.GlueDataCatalog),
}

return []any{m}
}

func flattenGlueDataCatalog(in *types.GlueDataCatalogConfig) []any {
if in == nil {
return nil
}

m := map[string]any{
names.AttrDatabaseName: in.DatabaseName,
names.AttrRoleARN: in.RoleArn,
"table_prefix": in.TablePrefix,
}

return []any{m}
}

func flattenErrorHandlingConfig(errorHandlingConfig *types.ErrorHandlingConfig) map[string]interface{} {
if errorHandlingConfig == nil {
return nil
Expand Down Expand Up @@ -2631,6 +2765,7 @@ func flattenPrefixConfig(prefixConfig *types.PrefixConfig) map[string]interface{

m["prefix_format"] = prefixConfig.PrefixFormat
m["prefix_type"] = prefixConfig.PrefixType
m["prefix_hierarchy"] = flex.FlattenStringyValueList(prefixConfig.PathPrefixHierarchy)

return m
}
Expand Down Expand Up @@ -3496,7 +3631,7 @@ func flattenTask(task types.Task) map[string]interface{} {
}

if v := task.TaskProperties; v != nil {
m["task_properties"] = v
m["task_properties"] = flex.FlattenStringValueMap(v)
}

m["task_type"] = task.TaskType
Expand Down
114 changes: 114 additions & 0 deletions internal/service/appflow/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,36 @@ func TestAccAppFlowFlow_disappears(t *testing.T) {
})
}

func TestAccAppFlowFlow_metadataCatalog(t *testing.T) {
ctx := acctest.Context(t)
var flowOutput appflow.DescribeFlowOutput
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_appflow_flow.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, names.AppFlowServiceID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckFlowDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccFlowConfig_metadata_catalog(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckFlowExists(ctx, resourceName, &flowOutput),
resource.TestCheckResourceAttr(resourceName, "metadata_catalog_config.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.prefix_config.0.prefix_hierarchy.0", "SCHEMA_VERSION"),
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.prefix_config.0.prefix_hierarchy.1", "EXECUTION_ID"),
resource.TestCheckResourceAttr(resourceName, "destination_flow_config.0.destination_connector_properties.0.s3.0.s3_output_format_config.0.prefix_config.0.prefix_hierarchy.#", acctest.Ct2),
),
},
{
Config: testAccFlowConfig_metadata_catalog(rName),
PlanOnly: true,
},
},
})
}

func testAccFlowConfig_base(rName string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}
Expand Down Expand Up @@ -761,6 +791,90 @@ resource "aws_appflow_flow" "test" {
)
}

func testAccFlowConfig_metadata_catalog(rName string) string {
return acctest.ConfigCompose(
testAccFlowConfig_base(rName),
fmt.Sprintf(`
resource "aws_iam_role" "test" {
name = %[1]q
assume_role_policy = <<POLICY
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
POLICY
}
resource "aws_appflow_flow" "test" {
name = %[1]q
source_flow_config {
connector_type = "S3"
source_connector_properties {
s3 {
bucket_name = aws_s3_bucket_policy.test_source.bucket
bucket_prefix = "flow"
}
}
}
destination_flow_config {
connector_type = "S3"
destination_connector_properties {
s3 {
bucket_name = aws_s3_bucket_policy.test_destination.bucket
s3_output_format_config {
prefix_config {
prefix_type = "PATH"
prefix_hierarchy = [
"SCHEMA_VERSION",
"EXECUTION_ID",
]
}
}
}
}
}
task {
task_type = "Map_all"
connector_operator {
s3 = "NO_OP"
}
task_properties = {
"DESTINATION_DATA_TYPE" = "id"
"SOURCE_DATA_TYPE" = "id"
}
}
metadata_catalog_config {
glue_data_catalog {
database_name = "testdb_name"
table_prefix = "test_prefix"
role_arn = aws_iam_role.test.arn
}
}
trigger_config {
trigger_type = "OnDemand"
}
}
`, rName),
)
}

func testAccCheckFlowExists(ctx context.Context, n string, v *appflow.DescribeFlowOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
Expand Down
10 changes: 10 additions & 0 deletions website/docs/r/appflow_flow.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ This resource supports the following arguments:
* `description` - (Optional) Description of the flow you want to create.
* `kms_arn` - (Optional) ARN (Amazon Resource Name) of the Key Management Service (KMS) key you provide for encryption. This is required if you do not want to use the Amazon AppFlow-managed KMS key. If you don't provide anything here, Amazon AppFlow uses the Amazon AppFlow-managed KMS key.
* `tags` - (Optional) Key-value mapping of resource tags. If configured with a provider [`default_tags` configuration block](https://registry.terraform.io/providers/hashicorp/aws/latest/docs#default_tags-configuration-block) present, tags with matching keys will overwrite those defined at the provider-level.
* `metadata_catalog_config` - (Optional) A [Catalog](#metadata-catalog-config) that determines the configuration that Amazon AppFlow uses when it catalogs the data that’s transferred by the associated flow. When Amazon AppFlow catalogs the data from a flow, it stores metadata in a data catalog.

### Destination Flow Config

Expand Down Expand Up @@ -252,6 +253,7 @@ EventBridge, Honeycode, and Marketo destination properties all support the follo

* `prefix_format` - (Optional) Determines the level of granularity that's included in the prefix. Valid values are `YEAR`, `MONTH`, `DAY`, `HOUR`, and `MINUTE`.
* `prefix_type` - (Optional) Determines the format of the prefix, and whether it applies to the file name, file path, or both. Valid values are `FILENAME`, `PATH`, and `PATH_AND_FILENAME`.
* `prefix_hierarchy` - (Optional) Determines whether the destination file path includes either or both of the selected elements. Valid values are `EXECUTION_ID` and `SCHEMA_VERSION`

##### Zendesk Destination Properties

Expand Down Expand Up @@ -391,6 +393,14 @@ resource "aws_appflow_flow" "example" {
}
```

### Metadata Catalog Config

The `metadata_catalog_config` block only supports one attribute: `glue_data_catalog`, a block which in turn supports the following:

* `database_name` - (Required) The name of an existing Glue database to store the metadata tables that Amazon AppFlow creates.
* `role_arn` - (Required) The ARN of an IAM role that grants AppFlow the permissions it needs to create Data Catalog tables, databases, and partitions.
* `table_prefix` - (Required) A naming prefix for each Data Catalog table that Amazon AppFlow creates

## Attribute Reference

This resource exports the following attributes in addition to the arguments above:
Expand Down

0 comments on commit 3a29801

Please sign in to comment.