Skip to content

Commit

Permalink
Merge pull request #19817 from hashicorp/b-lakeformation-19643
Browse files Browse the repository at this point in the history
r/lakeformation_permissions: Fix update bug
  • Loading branch information
YakDriver committed Jun 17, 2021
2 parents 111b55e + 5bf5b84 commit e22269b
Show file tree
Hide file tree
Showing 11 changed files with 1,207 additions and 345 deletions.
15 changes: 15 additions & 0 deletions .changelog/19817.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
```release-note:bug
resource/aws_lakeformation_permissions: Fix bug preventing updates (inconsistent result)
```

```release-note:bug
resource/aws_lakeformation_permissions: Fix bug where resource is not properly removed from state
```

```release-note:bug
resource/aws_lakeformation_permissions: Fix diffs resulting only from order of column names and exclude column names
```

```release-note:bug
data-source/aws_lakeformation_permissions: Fix diffs resulting from order of column names and exclude column names
```
98 changes: 30 additions & 68 deletions aws/data_source_aws_lakeformation_permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/lakeformation"
"github.com/hashicorp/aws-sdk-go-base/tfawserr"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/hashcode"
iamwaiter "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/iam/waiter"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/tfresource"
tflakeformation "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/lakeformation"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/lakeformation/waiter"
)

func dataSourceAwsLakeFormationPermissions() *schema.Resource {
Expand Down Expand Up @@ -134,8 +132,9 @@ func dataSourceAwsLakeFormationPermissions() *schema.Resource {
ValidateFunc: validateAwsAccountId,
},
"column_names": {
Type: schema.TypeList,
Type: schema.TypeSet,
Optional: true,
Set: schema.HashString,
Elem: &schema.Schema{
Type: schema.TypeString,
ValidateFunc: validation.NoZeroValues,
Expand All @@ -146,8 +145,9 @@ func dataSourceAwsLakeFormationPermissions() *schema.Resource {
Required: true,
},
"excluded_column_names": {
Type: schema.TypeList,
Type: schema.TypeSet,
Optional: true,
Set: schema.HashString,
Elem: &schema.Schema{
Type: schema.TypeString,
ValidateFunc: validation.NoZeroValues,
Expand Down Expand Up @@ -198,89 +198,49 @@ func dataSourceAwsLakeFormationPermissionsRead(d *schema.ResourceData, meta inte

if v, ok := d.GetOk("table"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.Resource.Table = expandLakeFormationTableResource(v.([]interface{})[0].(map[string]interface{}))
tableType = TableTypeTable
tableType = tflakeformation.TableTypeTable
}

if v, ok := d.GetOk("table_with_columns"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
// can't ListPermissions for TableWithColumns, so use Table instead
input.Resource.Table = expandLakeFormationTableWithColumnsResourceAsTable(v.([]interface{})[0].(map[string]interface{}))
tableType = TableTypeTableWithColumns
tableType = tflakeformation.TableTypeTableWithColumns
}

log.Printf("[DEBUG] Reading Lake Formation permissions: %v", input)
var allPermissions []*lakeformation.PrincipalResourcePermissions

err := resource.Retry(iamwaiter.PropagationTimeout, func() *resource.RetryError {
err := conn.ListPermissionsPages(input, func(resp *lakeformation.ListPermissionsOutput, lastPage bool) bool {
for _, permission := range resp.PrincipalResourcePermissions {
if permission == nil {
continue
}
columnNames := make([]*string, 0)
excludedColumnNames := make([]*string, 0)
columnWildcard := false

allPermissions = append(allPermissions, permission)
}
return !lastPage
})
if tableType == tflakeformation.TableTypeTableWithColumns {
if v, ok := d.GetOk("table_with_columns.0.wildcard"); ok {
columnWildcard = v.(bool)
}

if err != nil {
if tfawserr.ErrMessageContains(err, lakeformation.ErrCodeInvalidInputException, "Invalid principal") {
return resource.RetryableError(err)
if v, ok := d.GetOk("table_with_columns.0.column_names"); ok {
if v, ok := v.(*schema.Set); ok && v.Len() > 0 {
columnNames = expandStringSet(v)
}
return resource.NonRetryableError(fmt.Errorf("error reading Lake Formation Permissions: %w", err))
}
return nil
})

if tfresource.TimedOut(err) {
err = conn.ListPermissionsPages(input, func(resp *lakeformation.ListPermissionsOutput, lastPage bool) bool {
for _, permission := range resp.PrincipalResourcePermissions {
if permission == nil {
continue
}

allPermissions = append(allPermissions, permission)
if v, ok := d.GetOk("table_with_columns.0.excluded_column_names"); ok {
if v, ok := v.(*schema.Set); ok && v.Len() > 0 {
excludedColumnNames = expandStringSet(v)
}
return !lastPage
})
}
}

log.Printf("[DEBUG] Reading Lake Formation permissions: %v", input)

allPermissions, err := waiter.PermissionsReady(conn, input, tableType, columnNames, excludedColumnNames, columnWildcard)

d.SetId(fmt.Sprintf("%d", hashcode.String(input.String())))

if err != nil {
return fmt.Errorf("error reading Lake Formation permissions: %w", err)
}

var cleanPermissions []*lakeformation.PrincipalResourcePermissions

if input.Resource.Catalog != nil {
cleanPermissions = filterLakeFormationCatalogPermissions(allPermissions)
}

if input.Resource.DataLocation != nil {
cleanPermissions = filterLakeFormationDataLocationPermissions(allPermissions)
}

if input.Resource.Database != nil {
cleanPermissions = filterLakeFormationDatabasePermissions(allPermissions)
}

if tableType == TableTypeTable {
cleanPermissions = filterLakeFormationTablePermissions(
aws.StringValue(input.Resource.Table.Name),
input.Resource.Table.TableWildcard != nil,
allPermissions,
)
}

if tableType == TableTypeTableWithColumns {
cleanPermissions = filterLakeFormationTableWithColumnsPermissions(
d.Get("table_with_columns.0.database_name").(string),
d.Get("table_with_columns.0.wildcard").(bool),
expandStringList(d.Get("table_with_columns.0.column_names").([]interface{})),
expandStringList(d.Get("table_with_columns.0.excluded_column_names").([]interface{})),
allPermissions,
)
}
// clean permissions = filter out permissions that do not pertain to this specific resource
cleanPermissions := tflakeformation.FilterPermissions(input, tableType, columnNames, excludedColumnNames, columnWildcard, allPermissions)

if len(cleanPermissions) != len(allPermissions) {
log.Printf("[INFO] Resource Lake Formation clean permissions (%d) and all permissions (%d) have different lengths (this is not necessarily a problem): %s", len(cleanPermissions), len(allPermissions), d.Id())
Expand All @@ -292,6 +252,8 @@ func dataSourceAwsLakeFormationPermissionsRead(d *schema.ResourceData, meta inte

if cleanPermissions[0].Resource.Catalog != nil {
d.Set("catalog_resource", true)
} else {
d.Set("catalog_resource", false)
}

if cleanPermissions[0].Resource.DataLocation != nil {
Expand Down
181 changes: 181 additions & 0 deletions aws/internal/service/lakeformation/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package lakeformation

import (
"reflect"
"sort"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/lakeformation"
)

const (
TableNameAllTables = "ALL_TABLES"
TableTypeTable = "Table"
TableTypeTableWithColumns = "TableWithColumns"
)

func FilterPermissions(input *lakeformation.ListPermissionsInput, tableType string, columnNames []*string, excludedColumnNames []*string, columnWildcard bool, allPermissions []*lakeformation.PrincipalResourcePermissions) []*lakeformation.PrincipalResourcePermissions {
// For most Lake Formation resources, filtering within the provider is unnecessary. The input
// contains everything for AWS to give you back exactly what you want. However, many challenges
// arise with tables and tables with columns. Perhaps the two biggest problems (so far) are as
// follows:
// 1. SELECT - when you grant SELECT, it may be part of a list of permissions. However, when
// listing permissions, SELECT comes back in a separate permission.
// 2. Tables with columns. The ListPermissionsInput does not allow you to include a tables with
// columns resource. This means you might get back more permissions than actually pertain to
// the current situation. The table may have separate permissions that also come back.
//
// Thus, for most cases this is just a pass through filter but attempts to clean out
// permissions in the special cases to avoid extra permissions being included.

if input.Resource.Catalog != nil {
return FilterLakeFormationCatalogPermissions(allPermissions)
}

if input.Resource.DataLocation != nil {
return FilterLakeFormationDataLocationPermissions(allPermissions)
}

if input.Resource.Database != nil {
return FilterLakeFormationDatabasePermissions(allPermissions)
}

if tableType == TableTypeTableWithColumns {
return FilterLakeFormationTableWithColumnsPermissions(input.Resource.Table, columnNames, excludedColumnNames, columnWildcard, allPermissions)
}

if input.Resource.Table != nil || tableType == TableTypeTable {
return FilterLakeFormationTablePermissions(input.Resource.Table, allPermissions)
}

return nil
}

func FilterLakeFormationTablePermissions(table *lakeformation.TableResource, allPermissions []*lakeformation.PrincipalResourcePermissions) []*lakeformation.PrincipalResourcePermissions {
// CREATE PERMS (in) = ALL, ALTER, DELETE, DESCRIBE, DROP, INSERT, SELECT on Table, Name = (Table Name)
// LIST PERMS (out) = ALL, ALTER, DELETE, DESCRIBE, DROP, INSERT on Table, Name = (Table Name)
// LIST PERMS (out) = SELECT on TableWithColumns, Name = (Table Name), ColumnWildcard

// CREATE PERMS (in) = ALL, ALTER, DELETE, DESCRIBE, DROP, INSERT, SELECT on Table, TableWildcard
// LIST PERMS (out) = ALL, ALTER, DELETE, DESCRIBE, DROP, INSERT on Table, TableWildcard, Name = ALL_TABLES
// LIST PERMS (out) = SELECT on TableWithColumns, Name = ALL_TABLES, ColumnWildcard

var cleanPermissions []*lakeformation.PrincipalResourcePermissions

for _, perm := range allPermissions {
if perm.Resource.TableWithColumns != nil && perm.Resource.TableWithColumns.ColumnWildcard != nil {
if aws.StringValue(perm.Resource.TableWithColumns.Name) == aws.StringValue(table.Name) || (table.TableWildcard != nil && aws.StringValue(perm.Resource.TableWithColumns.Name) == TableNameAllTables) {
if len(perm.Permissions) > 0 && aws.StringValue(perm.Permissions[0]) == lakeformation.PermissionSelect {
cleanPermissions = append(cleanPermissions, perm)
continue
}

if len(perm.PermissionsWithGrantOption) > 0 && aws.StringValue(perm.PermissionsWithGrantOption[0]) == lakeformation.PermissionSelect {
cleanPermissions = append(cleanPermissions, perm)
continue
}
}
}

if perm.Resource.Table != nil && aws.StringValue(perm.Resource.Table.DatabaseName) == aws.StringValue(table.DatabaseName) {
if aws.StringValue(perm.Resource.Table.Name) == aws.StringValue(table.Name) {
cleanPermissions = append(cleanPermissions, perm)
continue
}

if perm.Resource.Table.TableWildcard != nil && table.TableWildcard != nil {
cleanPermissions = append(cleanPermissions, perm)
continue
}
}
continue
}

return cleanPermissions
}

func FilterLakeFormationTableWithColumnsPermissions(twc *lakeformation.TableResource, columnNames []*string, excludedColumnNames []*string, columnWildcard bool, allPermissions []*lakeformation.PrincipalResourcePermissions) []*lakeformation.PrincipalResourcePermissions {
// CREATE PERMS (in) = ALL, ALTER, DELETE, DESCRIBE, DROP, INSERT, SELECT on TableWithColumns, Name = (Table Name), ColumnWildcard
// LIST PERMS (out) = ALL, ALTER, DELETE, DESCRIBE, DROP, INSERT on Table, Name = (Table Name)
// LIST PERMS (out) = SELECT on TableWithColumns, Name = (Table Name), ColumnWildcard

var cleanPermissions []*lakeformation.PrincipalResourcePermissions

for _, perm := range allPermissions {
if perm.Resource.TableWithColumns != nil && perm.Resource.TableWithColumns.ColumnNames != nil {
if StringSlicesEqualIgnoreOrder(perm.Resource.TableWithColumns.ColumnNames, columnNames) {
cleanPermissions = append(cleanPermissions, perm)
continue
}
}

if perm.Resource.TableWithColumns != nil && perm.Resource.TableWithColumns.ColumnWildcard != nil && (columnWildcard || len(excludedColumnNames) > 0) {
if perm.Resource.TableWithColumns.ColumnWildcard.ExcludedColumnNames == nil && len(excludedColumnNames) == 0 {
cleanPermissions = append(cleanPermissions, perm)
continue
}

if len(excludedColumnNames) > 0 && StringSlicesEqualIgnoreOrder(perm.Resource.TableWithColumns.ColumnWildcard.ExcludedColumnNames, excludedColumnNames) {
cleanPermissions = append(cleanPermissions, perm)
continue
}
}

if perm.Resource.Table != nil && aws.StringValue(perm.Resource.Table.Name) == aws.StringValue(twc.Name) {
cleanPermissions = append(cleanPermissions, perm)
continue
}
}

return cleanPermissions
}

func FilterLakeFormationCatalogPermissions(allPermissions []*lakeformation.PrincipalResourcePermissions) []*lakeformation.PrincipalResourcePermissions {
var cleanPermissions []*lakeformation.PrincipalResourcePermissions

for _, perm := range allPermissions {
if perm.Resource.Catalog != nil {
cleanPermissions = append(cleanPermissions, perm)
}
}

return cleanPermissions
}

func FilterLakeFormationDataLocationPermissions(allPermissions []*lakeformation.PrincipalResourcePermissions) []*lakeformation.PrincipalResourcePermissions {
var cleanPermissions []*lakeformation.PrincipalResourcePermissions

for _, perm := range allPermissions {
if perm.Resource.DataLocation != nil {
cleanPermissions = append(cleanPermissions, perm)
}
}

return cleanPermissions
}

func FilterLakeFormationDatabasePermissions(allPermissions []*lakeformation.PrincipalResourcePermissions) []*lakeformation.PrincipalResourcePermissions {
var cleanPermissions []*lakeformation.PrincipalResourcePermissions

for _, perm := range allPermissions {
if perm.Resource.Database != nil {
cleanPermissions = append(cleanPermissions, perm)
}
}

return cleanPermissions
}

func StringSlicesEqualIgnoreOrder(s1, s2 []*string) bool {
if len(s1) != len(s2) {
return false
}

v1 := aws.StringValueSlice(s1)
v2 := aws.StringValueSlice(s2)

sort.Strings(v1)
sort.Strings(v2)

return reflect.DeepEqual(v1, v2)
}
Loading

0 comments on commit e22269b

Please sign in to comment.