Skip to content

Commit

Permalink
refactor table filter
Browse files Browse the repository at this point in the history
Signed-off-by: Toliver Jue <toliver@planetscale.com>
  • Loading branch information
Toliver Jue committed May 29, 2020
1 parent 41c356e commit c3dabaa
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 54 deletions.
143 changes: 89 additions & 54 deletions go/vt/mysqlctl/tmutils/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,94 +63,129 @@ func (tds TableDefinitions) Swap(i, j int) {
tds[i], tds[j] = tds[j], tds[i]
}

// FilterTables returns a copy which includes only whitelisted tables
type TableFilter struct {
includeViews bool

filterTables bool
tableNames []string
tableREs []*regexp.Regexp

filterExcludeTables bool
excludeTableNames []string
excludeTableREs []*regexp.Regexp
}

// NewTableFilter creates a filter for whitelisted tables
// (tables), no blacklisted tables (excludeTables) and optionally
// views (includeViews).
func FilterTables(sd *tabletmanagerdatapb.SchemaDefinition, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
copy := *sd
copy.TableDefinitions = make([]*tabletmanagerdatapb.TableDefinition, 0, len(sd.TableDefinitions))
func NewTableFilter(tables, excludeTables []string, includeViews bool) (*TableFilter, error) {
f := &TableFilter{
includeViews: includeViews,
}

// Build a list of regexp to match table names against.
// We only use regexps if the name starts and ends with '/'.
// Otherwise the entry in the arrays is nil, and we use the original
// table name.
var tableRegexps []*regexp.Regexp
if len(tables) > 0 {
tableRegexps = make([]*regexp.Regexp, len(tables))
for i, table := range tables {
if strings.HasPrefix(table, "/") {
f.filterTables = true
for _, table := range tables {
if strings.HasPrefix(table, "/") && strings.HasSuffix(table, "/") {
table = strings.Trim(table, "/")
var err error
tableRegexps[i], err = regexp.Compile(table)
re, err := regexp.Compile(table)
if err != nil {
return nil, fmt.Errorf("cannot compile regexp %v for table: %v", table, err)
}

f.tableREs = append(f.tableREs, re)
} else {
f.tableNames = append(f.tableNames, table)
}
}
}
var excludeTableRegexps []*regexp.Regexp

if len(excludeTables) > 0 {
excludeTableRegexps = make([]*regexp.Regexp, len(excludeTables))
for i, table := range excludeTables {
if strings.HasPrefix(table, "/") {
f.filterExcludeTables = true
for _, table := range excludeTables {
if strings.HasPrefix(table, "/") && strings.HasSuffix(table, "/") {
table = strings.Trim(table, "/")
var err error
excludeTableRegexps[i], err = regexp.Compile(table)
re, err := regexp.Compile(table)
if err != nil {
return nil, fmt.Errorf("cannot compile regexp %v for excludeTable: %v", table, err)
}

f.excludeTableREs = append(f.tableREs, re)
} else {
f.excludeTableNames = append(f.excludeTableNames, table)
}
}
}

for _, table := range sd.TableDefinitions {
// Check it's a table we want.
if len(tables) > 0 {
foundMatch := false
for i, tableRegexp := range tableRegexps {
if tableRegexp == nil {
// Not a regexp, just compare in a
// case insensitive way.
if strings.EqualFold(tables[i], table.Name) {
foundMatch = true
break
}
} else {
if tableRegexp.MatchString(table.Name) {
foundMatch = true
break
}
}
}
if !foundMatch {
continue
return f, nil
}

// Includes returns whether a tableName/tableType should be included in this TableFilter.
func (f *TableFilter) Includes(tableName string, tableType string) bool {
if f.filterTables {
matches := false
for _, name := range f.tableNames {
if strings.EqualFold(name, tableName) {
matches = true
break
}
}
excluded := false
for i, tableRegexp := range excludeTableRegexps {
if tableRegexp == nil {
// Not a regexp, just compare in a
// case insensitive way.
if strings.EqualFold(excludeTables[i], table.Name) {
excluded = true
break
}
} else {
if tableRegexp.MatchString(table.Name) {
excluded = true

if !matches {
for _, re := range f.tableREs {
if re.MatchString(tableName) {
matches = true
break
}
}
}
if excluded {
continue

if !matches {
return false
}
}

if !includeViews && table.Type == TableView {
continue
if f.filterExcludeTables {
for _, name := range f.excludeTableNames {
if strings.EqualFold(name, tableName) {
return false
}
}

for _, re := range f.excludeTableREs {
if re.MatchString(tableName) {
return false
}
}
}

if !f.includeViews && tableType == TableView {
return false
}

return true
}

copy.TableDefinitions = append(copy.TableDefinitions, table)
// FilterTables returns a copy which includes only whitelisted tables
// (tables), no blacklisted tables (excludeTables) and optionally
// views (includeViews).
func FilterTables(sd *tabletmanagerdatapb.SchemaDefinition, tables, excludeTables []string, includeViews bool) (*tabletmanagerdatapb.SchemaDefinition, error) {
copy := *sd
copy.TableDefinitions = make([]*tabletmanagerdatapb.TableDefinition, 0, len(sd.TableDefinitions))

f, err := NewTableFilter(tables, excludeTables, includeViews)
if err != nil {
return nil, err
}

for _, table := range sd.TableDefinitions {
if f.Includes(table.Name, table.Type) {
copy.TableDefinitions = append(copy.TableDefinitions, table)
}
}

// Regenerate hash over tables because it may have changed.
Expand Down
Loading

0 comments on commit c3dabaa

Please sign in to comment.