Skip to content

Commit

Permalink
Merge pull request #7153 from planetscale/online-ddl-normalize-queries
Browse files Browse the repository at this point in the history
Normalizing Online-DDL queries
  • Loading branch information
shlomi-noach authored Dec 16, 2020
2 parents 777a19d + 95fba79 commit 54e01f0
Show file tree
Hide file tree
Showing 11 changed files with 2,981 additions and 2,877 deletions.
25 changes: 16 additions & 9 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL
return onlineDDL, nil
}

// getOnlineDDLAction parses the given SQL into a statement and returns the action type of the DDL statement, or error
// if the statement is not a DDL
func getOnlineDDLAction(sql string) (action sqlparser.DDLAction, ddlStmt sqlparser.DDLStatement, err error) {
stmt, err := sqlparser.Parse(sql)
if err != nil {
return action, ddlStmt, fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", sql, err)
}
switch ddlStmt := stmt.(type) {
case sqlparser.DDLStatement:
return ddlStmt.GetAction(), ddlStmt, nil
}
return action, ddlStmt, fmt.Errorf("Unsupported query type: %s", sql)
}

// NewOnlineDDL creates a schema change request with self generated UUID and RequestTime
func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string, requestContext string) (*OnlineDDL, error) {
u, err := createUUID("_")
Expand Down Expand Up @@ -175,15 +189,8 @@ func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) {

// GetAction extracts the DDL action type from the online DDL statement
func (onlineDDL *OnlineDDL) GetAction() (action sqlparser.DDLAction, err error) {
stmt, err := sqlparser.Parse(onlineDDL.SQL)
if err != nil {
return action, fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", onlineDDL.SQL, err)
}
switch stmt := stmt.(type) {
case sqlparser.DDLStatement:
return stmt.GetAction(), nil
}
return action, fmt.Errorf("Unsupported query type: %s", onlineDDL.SQL)
action, _, err = getOnlineDDLAction(onlineDDL.SQL)
return action, err
}

// GetActionStr returns a string representation of the DDL action
Expand Down
30 changes: 30 additions & 0 deletions go/vt/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ package schema
import (
"regexp"
"strings"

"vitess.io/vitess/go/vt/sqlparser"
)

// NormalizedDDLQuery contains a query which is online-ddl -normalized
type NormalizedDDLQuery struct {
SQL string
TableName sqlparser.TableName
}

var (
// ALTER TABLE
alterTableBasicPattern = `(?s)(?i)\balter\s+table\s+`
Expand Down Expand Up @@ -64,3 +72,25 @@ func ParseAlterTableOptions(alterStatement string) (explicitSchema, explicitTabl
}
return explicitSchema, explicitTable, alterOptions
}

// NormalizeOnlineDDL normalizes a given query for OnlineDDL, possibly exploding it into multiple distinct queries
func NormalizeOnlineDDL(sql string) (normalized []*NormalizedDDLQuery, err error) {
action, ddlStmt, err := getOnlineDDLAction(sql)
if err != nil {
return normalized, err
}
switch action {
case sqlparser.DropDDLAction:
tables := ddlStmt.GetFromTables()
for _, table := range tables {
ddlStmt.SetFromTables([]sqlparser.TableName{table})
normalized = append(normalized, &NormalizedDDLQuery{SQL: sqlparser.String(ddlStmt), TableName: table})
}
return normalized, nil
}
if ddlStmt.IsFullyParsed() {
sql = sqlparser.String(ddlStmt)
}
n := &NormalizedDDLQuery{SQL: sql, TableName: ddlStmt.GetTable()}
return []*NormalizedDDLQuery{n}, nil
}
56 changes: 47 additions & 9 deletions go/vt/schema/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package schema

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseAlterTableOptions(t *testing.T) {
Expand All @@ -40,14 +43,49 @@ func TestParseAlterTableOptions(t *testing.T) {
}
for query, expect := range tests {
schema, table, options := ParseAlterTableOptions(query)
if schema != expect.schema {
t.Errorf("schema: %+v, want:%+v", schema, expect.schema)
}
if table != expect.table {
t.Errorf("table: %+v, want:%+v", table, expect.table)
}
if options != expect.options {
t.Errorf("options: %+v, want:%+v", options, expect.options)
}
assert.Equal(t, expect.schema, schema)
assert.Equal(t, expect.table, table)
assert.Equal(t, expect.options, options)
}
}

func TestNormalizeOnlineDDL(t *testing.T) {
type expect struct {
sqls []string
isError bool
}
tests := map[string]expect{
"alter table t add column i int, drop column d": {sqls: []string{"alter table t add column i int, drop column d"}},
"create table t (id int primary key)": {sqls: []string{"create table t (id int primary key)"}},
"drop table t": {sqls: []string{"drop table t"}},
"drop table if exists t": {sqls: []string{"drop table if exists t"}},
"drop table t1, t2, t3": {sqls: []string{"drop table t1", "drop table t2", "drop table t3"}},
"drop table if exists t1, t2, t3": {sqls: []string{"drop table if exists t1", "drop table if exists t2", "drop table if exists t3"}},
"create index i_idx on t(id)": {sqls: []string{"alter table t add index i_idx (id)"}},
"create index i_idx on t(name(12))": {sqls: []string{"alter table t add index i_idx (`name`(12))"}},
"create index i_idx on t(id, `ts`, name(12))": {sqls: []string{"alter table t add index i_idx (id, ts, `name`(12))"}},
"create unique index i_idx on t(id)": {sqls: []string{"alter table t add unique index i_idx (id)"}},
"create index i_idx using btree on t(id)": {sqls: []string{"alter table t add index i_idx (id) using btree"}},
"create index with syntax error i_idx on t(id)": {isError: true},
"select * from t": {isError: true},
"drop database t": {isError: true},
}
for query, expect := range tests {
t.Run(query, func(t *testing.T) {
normalized, err := NormalizeOnlineDDL(query)
if expect.isError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
sqls := []string{}
for _, n := range normalized {
sql := n.SQL
sql = strings.ReplaceAll(sql, "\n", "")
sql = strings.ReplaceAll(sql, "\t", "")
sqls = append(sqls, sql)
}
assert.Equal(t, expect.sqls, sqls)
}
})
}
}
50 changes: 27 additions & 23 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@ func (exec *TabletExecutor) preflightSchemaChanges(ctx context.Context, sqls []s
return err
}

// executeSQL executes a single SQL statement either as online DDL or synchronously on all tablets.
// In online DDL case, the query may be exploded into multiple queries during
func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, execResult *ExecuteResult) error {
stat, err := sqlparser.Parse(sql)
if err != nil {
return err
}
switch ddl := stat.(type) {
case sqlparser.DDLStatement:
if isOnlineDDL, strategy, options := exec.isOnlineSchemaDDL(ddl); isOnlineDDL {
exec.wr.Logger().Infof("Received online DDL request. strategy=%+v", strategy)
normalizedQueries, err := schema.NormalizeOnlineDDL(sql)
if err != nil {
return err
}
for _, normalized := range normalizedQueries {
exec.executeOnlineDDL(ctx, execResult, normalized.SQL, normalized.TableName.Name.String(), strategy, options)
}
return nil
}
}
exec.executeOnAllTablets(ctx, execResult, sql)
return nil
}

// Execute applies schema changes
func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *ExecuteResult {
execResult := ExecuteResult{}
Expand Down Expand Up @@ -247,32 +272,11 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute

for index, sql := range sqls {
execResult.CurSQLIndex = index

stat, err := sqlparser.Parse(sql)
if err != nil {
if err := exec.executeSQL(ctx, sql, &execResult); err != nil {
execResult.ExecutorErr = err.Error()
return &execResult
}
isOnlineDDL, strategy, options := exec.isOnlineSchemaDDL(nil)
tableName := ""
switch ddl := stat.(type) {
case sqlparser.DDLStatement:
switch ddl.GetAction() {
case sqlparser.DropDDLAction:
// TODO (shlomi): break into distinct per-table DROP statements; on a future PR where
// we implement lazy DROP TABLE on Online DDL
tableName = ddl.GetFromTables()[0].Name.String()
default:
tableName = ddl.GetTable().Name.String()
}
isOnlineDDL, strategy, options = exec.isOnlineSchemaDDL(ddl)
}
exec.wr.Logger().Infof("Received DDL request. strategy=%+v", strategy)
if isOnlineDDL {
exec.executeOnlineDDL(ctx, &execResult, sql, tableName, strategy, options)
} else {
exec.executeOnAllTablets(ctx, &execResult, sql)
}

if len(execResult.FailedShards) > 0 {
break
}
Expand Down
33 changes: 27 additions & 6 deletions go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type (
GetVindexCols() []ColIdent
AffectedTables() TableNames
SetTable(qualifier string, name string)
SetFromTables(tables TableNames)
Statement
}

Expand Down Expand Up @@ -290,7 +291,6 @@ type (
CreateIndex struct {
Constraint string
Name ColIdent
IndexType string
Table TableName
Columns []*IndexColumn
Options []*IndexOption
Expand Down Expand Up @@ -584,6 +584,26 @@ func (node *CreateView) GetFromTables() TableNames {
return nil
}

// SetFromTables implements DDLStatement.
func (node *DDL) SetFromTables(tables TableNames) {
node.FromTables = tables
}

// SetFromTables implements DDLStatement.
func (node *CreateIndex) SetFromTables(tables TableNames) {
// irrelevant
}

// SetFromTables implements DDLStatement.
func (node *CreateTable) SetFromTables(tables TableNames) {
// irrelevant
}

// SetFromTables implements DDLStatement.
func (node *CreateView) SetFromTables(tables TableNames) {
// irrelevant
}

// GetToTables implements the DDLStatement interface
func (node *DDL) GetToTables() TableNames {
return node.ToTables
Expand Down Expand Up @@ -2545,15 +2565,13 @@ func (node *SelectInto) Format(buf *TrackedBuffer) {

// Format formats the node.
func (node *CreateIndex) Format(buf *TrackedBuffer) {
buf.WriteString("create")
buf.astPrintf(node, "alter table %v add", node.Table)
if node.Constraint != "" {
buf.WriteString(" " + node.Constraint)
}
buf.astPrintf(node, " index %v", node.Name)
if node.IndexType != "" {
buf.WriteString(" using " + node.IndexType)
}
buf.astPrintf(node, " on %v (", node.Table)

buf.WriteString(" (")
for i, col := range node.Columns {
if i != 0 {
buf.astPrintf(node, ", %v", col.Column)
Expand All @@ -2569,6 +2587,9 @@ func (node *CreateIndex) Format(buf *TrackedBuffer) {
}
buf.astPrintf(node, ")")
for _, opt := range node.Options {
//if opt == nil {
// continue
//}
buf.WriteString(" " + strings.ToLower(opt.Name))
if opt.String != "" {
buf.WriteString(" " + opt.String)
Expand Down
23 changes: 15 additions & 8 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,20 +1159,26 @@ var (
input: "alter vschema on a drop vindex `add`",
output: "alter vschema on a drop vindex `add`",
}, {
input: "create index a on b (col1)",
input: "create index a on b (col1)",
output: "alter table b add index a (col1)",
}, {
input: "create unique index a on b (col1)",
input: "create unique index a on b (col1)",
output: "alter table b add unique index a (col1)",
}, {
input: "create unique index a using foo on b (col1 desc)",
input: "create unique index a using foo on b (col1 desc)",
output: "alter table b add unique index a (col1 desc) using foo",
}, {
input: "create fulltext index a using foo on b (col1)",
input: "create fulltext index a on b (col1) with parser a",
output: "alter table b add fulltext index a (col1) with parser a",
}, {
input: "create spatial index a using foo on b (col1)",
input: "create spatial index a on b (col1)",
output: "alter table b add spatial index a (col1)",
}, {
input: "create index a on b (col1) using btree key_block_size 12 with parser 'a' comment 'string' algorithm inplace lock none",
input: "create fulltext index a on b (col1) key_block_size=12 with parser a comment 'string' algorithm inplace lock none",
output: "alter table b add fulltext index a (col1) key_block_size 12 with parser a comment 'string' algorithm inplace lock none",
}, {
input: "create index a on b ((col1 + col2), (col1*col2))",
output: "create index a on b ()",
output: "alter table b add index a ()",
partialDDL: true,
}, {
input: "create algorithm = merge sql security definer view a as select * from e",
Expand Down Expand Up @@ -1899,7 +1905,8 @@ func TestCaseSensitivity(t *testing.T) {
input: "create table A (\n\t`B` int\n)",
output: "create table A (\n\tB int\n)",
}, {
input: "create index b on A (col1 desc)",
input: "create index b on A (col1 desc)",
output: "alter table A add index b (col1 desc)",
}, {
input: "alter table A foo",
output: "alter table A",
Expand Down
Loading

0 comments on commit 54e01f0

Please sign in to comment.