Skip to content

Commit

Permalink
Merge pull request #8239 from planetscale/online-dd-vrepl-json
Browse files Browse the repository at this point in the history
Online DDL/Vreplication: column type awareness
  • Loading branch information
shlomi-noach authored Jun 7, 2021
2 parents f53dd15 + bdb18bb commit 027ef58
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 50 deletions.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

14 changes: 11 additions & 3 deletions go/vt/vttablet/onlineddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,23 +299,31 @@ const (
AND ACTION_TIMING='AFTER'
AND LEFT(TRIGGER_NAME, 7)='pt_osc_'
`
sqlSelectColumnTypes = `
select
*
from
information_schema.columns
where
table_schema=%a
and table_name=%a
`
selSelectCountFKParentConstraints = `
SELECT
COUNT(*) as num_fk_constraints
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE
REFERENCED_TABLE_SCHEMA=%a AND REFERENCED_TABLE_NAME=%a
AND REFERENCED_TABLE_NAME IS NOT NULL
`
`
selSelectCountFKChildConstraints = `
SELECT
COUNT(*) as num_fk_constraints
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE
TABLE_SCHEMA=%a AND TABLE_NAME=%a
AND REFERENCED_TABLE_NAME IS NOT NULL
`

`
sqlDropTrigger = "DROP TRIGGER IF EXISTS `%a`.`%a`"
sqlShowTablesLike = "SHOW TABLES LIKE '%a'"
sqlCreateTableLike = "CREATE TABLE `%a` LIKE `%a`"
Expand Down
77 changes: 75 additions & 2 deletions go/vt/vttablet/onlineddl/vrepl.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,64 @@ func (v *VRepl) readTableColumns(ctx context.Context, conn *dbconnpool.DBConnect
return vrepl.NewColumnList(columnNames), vrepl.NewColumnList(virtualColumnNames), vrepl.NewColumnList(pkColumnNames), nil
}

// applyColumnTypes
func (v *VRepl) applyColumnTypes(ctx context.Context, conn *dbconnpool.DBConnection, tableName string, columnsLists ...*vrepl.ColumnList) error {
query, err := sqlparser.ParseAndBind(sqlSelectColumnTypes,
sqltypes.StringBindVariable(v.dbName),
sqltypes.StringBindVariable(tableName),
)
if err != nil {
return err
}
rs, err := conn.ExecuteFetch(query, math.MaxInt64, true)
if err != nil {
return err
}
for _, row := range rs.Named().Rows {
columnName := row["COLUMN_NAME"].ToString()
columnType := row["COLUMN_TYPE"].ToString()
columnOctetLength := row.AsUint64("CHARACTER_OCTET_LENGTH", 0)

for _, columnsList := range columnsLists {
column := columnsList.GetColumn(columnName)
if column == nil {
continue
}

if strings.Contains(columnType, "unsigned") {
column.IsUnsigned = true
}
if strings.Contains(columnType, "mediumint") {
column.Type = vrepl.MediumIntColumnType
}
if strings.Contains(columnType, "timestamp") {
column.Type = vrepl.TimestampColumnType
}
if strings.Contains(columnType, "datetime") {
column.Type = vrepl.DateTimeColumnType
}
if strings.Contains(columnType, "json") {
column.Type = vrepl.JSONColumnType
}
if strings.Contains(columnType, "float") {
column.Type = vrepl.FloatColumnType
}
if strings.HasPrefix(columnType, "enum") {
column.Type = vrepl.EnumColumnType
column.EnumValues = vrepl.ParseEnumValues(columnType)
}
if strings.HasPrefix(columnType, "binary") {
column.Type = vrepl.BinaryColumnType
column.BinaryOctetLength = columnOctetLength
}
if charset := row.AsString("CHARACTER_SET_NAME", ""); charset != "" {
column.Charset = charset
}
}
}
return nil
}

// getSharedColumns returns the intersection of two lists of columns in same order as the first list
func (v *VRepl) getSharedColumns(sourceColumns, targetColumns *vrepl.ColumnList, sourceVirtualColumns, targetVirtualColumns *vrepl.ColumnList, columnRenameMap map[string]string) (
sourceSharedColumns *vrepl.ColumnList, targetSharedColumns *vrepl.ColumnList, sharedColumnsMap map[string]string,
Expand Down Expand Up @@ -293,6 +351,13 @@ func (v *VRepl) analyzeTables(ctx context.Context, conn *dbconnpool.DBConnection
return fmt.Errorf("Found no shared PRIMARY KEY columns between `%s` and `%s`", v.sourceTable, v.targetTable)
}

if err := v.applyColumnTypes(ctx, conn, v.sourceTable, sourceColumns, sourceVirtualColumns, sourcePKColumns, v.sourceSharedColumns, v.sharedPKColumns); err != nil {
return err
}
if err := v.applyColumnTypes(ctx, conn, v.targetTable, targetColumns, targetVirtualColumns, targetPKColumns, v.targetSharedColumns); err != nil {
return err
}

v.sourceAutoIncrement, err = v.readAutoIncrement(ctx, conn, v.sourceTable)
if err != nil {
return err
Expand All @@ -309,13 +374,21 @@ func (v *VRepl) generateFilterQuery(ctx context.Context) error {
}
var sb strings.Builder
sb.WriteString("select ")
for i, name := range v.sourceSharedColumns.Names() {
for i, col := range v.sourceSharedColumns.Columns() {
name := col.Name
targetName := v.sharedColumnsMap[name]

if i > 0 {
sb.WriteString(", ")
}
sb.WriteString(escapeName(name))
switch col.Type {
case vrepl.JSONColumnType:
sb.WriteString("convert(")
sb.WriteString(escapeName(name))
sb.WriteString(" using utf8mb4)")
default:
sb.WriteString(escapeName(name))
}
sb.WriteString(" as ")
sb.WriteString(escapeName(targetName))
}
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vttablet/onlineddl/vrepl/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`)
renameTableRegexp = regexp.MustCompile(`(?i)\brename\s+(to|as)\s+`)
autoIncrementRegexp = regexp.MustCompile(`(?i)\bauto_increment[\s]*[=]?[\s]*([0-9]+)`)
enumValuesRegexp = regexp.MustCompile("^enum[(](.*)[)]$")
)

// AlterTableParser is a parser tool for ALTER TABLE statements
Expand Down Expand Up @@ -198,3 +199,11 @@ func (p *AlterTableParser) GetAlterStatementOptions() string {
func (p *AlterTableParser) ColumnRenameMap() map[string]string {
return p.columnRenameMap
}

// ParseEnumValues parses the comma delimited part of an enum column definition
func ParseEnumValues(enumColumnType string) string {
if submatch := enumValuesRegexp.FindStringSubmatch(enumColumnType); len(submatch) > 0 {
return submatch[1]
}
return enumColumnType
}
25 changes: 24 additions & 1 deletion go/vt/vttablet/onlineddl/vrepl/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,32 @@ import (
"strings"
)

// ColumnType indicated some MySQL data types
type ColumnType int

const (
UnknownColumnType ColumnType = iota
TimestampColumnType
DateTimeColumnType
EnumColumnType
MediumIntColumnType
JSONColumnType
FloatColumnType
BinaryColumnType
)

// Column represents a table column
type Column struct {
Name string
Name string
IsUnsigned bool
Charset string
Type ColumnType
EnumValues string
EnumToTextConversion bool

// add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog.
// https://github.com/github/gh-ost/issues/909
BinaryOctetLength uint64
}

// NewColumns creates a new column array from non empty names
Expand Down

0 comments on commit 027ef58

Please sign in to comment.