Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalizing Online-DDL queries #7153

Merged
merged 12 commits into from
Dec 16, 2020
25 changes: 16 additions & 9 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL
return onlineDDL, nil
}

// getOnlineDDLAction parses the given SQL into a statement and returns the action type of the DDL statement, or error
// if the statement is not a DDL
func getOnlineDDLAction(sql string) (action sqlparser.DDLAction, ddlStmt sqlparser.DDLStatement, err error) {
stmt, err := sqlparser.Parse(sql)
if err != nil {
return action, ddlStmt, fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", sql, err)
}
switch ddlStmt := stmt.(type) {
case sqlparser.DDLStatement:
return ddlStmt.GetAction(), ddlStmt, nil
}
return action, ddlStmt, fmt.Errorf("Unsupported query type: %s", sql)
}

// NewOnlineDDL creates a schema change request with self generated UUID and RequestTime
func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string, requestContext string) (*OnlineDDL, error) {
u, err := createUUID("_")
Expand Down Expand Up @@ -175,15 +189,8 @@ func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) {

// GetAction extracts the DDL action type from the online DDL statement
func (onlineDDL *OnlineDDL) GetAction() (action sqlparser.DDLAction, err error) {
stmt, err := sqlparser.Parse(onlineDDL.SQL)
if err != nil {
return action, fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", onlineDDL.SQL, err)
}
switch stmt := stmt.(type) {
case sqlparser.DDLStatement:
return stmt.GetAction(), nil
}
return action, fmt.Errorf("Unsupported query type: %s", onlineDDL.SQL)
action, _, err = getOnlineDDLAction(onlineDDL.SQL)
return action, err
}

// GetActionStr returns a string representation of the DDL action
Expand Down
34 changes: 34 additions & 0 deletions go/vt/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ package schema
import (
"regexp"
"strings"

"vitess.io/vitess/go/vt/sqlparser"
)

// NormalizedDDLQuery contains a query which is online-ddl -normalized
type NormalizedDDLQuery struct {
SQL string
TableName sqlparser.TableName
}

var (
// ALTER TABLE
alterTableBasicPattern = `(?s)(?i)\balter\s+table\s+`
Expand Down Expand Up @@ -64,3 +72,29 @@ func ParseAlterTableOptions(alterStatement string) (explicitSchema, explicitTabl
}
return explicitSchema, explicitTable, alterOptions
}

// NormalizeOnlineDDL normalizes a given query for OnlineDDL, possibly exploding it into multiple distinct queries
func NormalizeOnlineDDL(sql string) (normalized []*NormalizedDDLQuery, err error) {
action, ddlStmt, err := getOnlineDDLAction(sql)
if err != nil {
return normalized, err
}
switch action {
case sqlparser.AlterDDLAction:
switch ddlStmt.(type) {
case *sqlparser.CreateIndex:
if ddlStmt.IsFullyParsed() {
sql = sqlparser.String(ddlStmt)
}
}
case sqlparser.DropDDLAction:
tables := ddlStmt.GetFromTables()
for _, table := range tables {
ddlStmt.SetFromTables([]sqlparser.TableName{table})
normalized = append(normalized, &NormalizedDDLQuery{SQL: sqlparser.String(ddlStmt), TableName: table})
}
return normalized, nil
}
n := &NormalizedDDLQuery{SQL: sql, TableName: ddlStmt.GetTable()}
return []*NormalizedDDLQuery{n}, nil
}
48 changes: 40 additions & 8 deletions go/vt/schema/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package schema

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseAlterTableOptions(t *testing.T) {
Expand All @@ -40,14 +42,44 @@ func TestParseAlterTableOptions(t *testing.T) {
}
for query, expect := range tests {
schema, table, options := ParseAlterTableOptions(query)
if schema != expect.schema {
t.Errorf("schema: %+v, want:%+v", schema, expect.schema)
}
if table != expect.table {
t.Errorf("table: %+v, want:%+v", table, expect.table)
}
if options != expect.options {
t.Errorf("options: %+v, want:%+v", options, expect.options)
assert.Equal(t, expect.schema, schema)
assert.Equal(t, expect.table, table)
assert.Equal(t, expect.options, options)
}
}

func TestNormalizeOnlineDDL(t *testing.T) {
type expect struct {
sqls []string
isError bool
}
tests := map[string]expect{
"alter table t add column i int, drop column d": {sqls: []string{"alter table t add column i int, drop column d"}},
"create table t(id int primary key)": {sqls: []string{"create table t(id int primary key)"}},
"drop table t": {sqls: []string{"drop table t"}},
"drop table if exists t": {sqls: []string{"drop table if exists t"}},
"drop table t1, t2, t3": {sqls: []string{"drop table t1", "drop table t2", "drop table t3"}},
"drop table if exists t1, t2, t3": {sqls: []string{"drop table if exists t1", "drop table if exists t2", "drop table if exists t3"}},
"create index i_idx on t(id)": {sqls: []string{"alter table t add index i_idx (id)"}},
"create index i_idx on t(name(12))": {sqls: []string{"alter table t add index i_idx (`name`(12))"}},
"create index i_idx on t(id, `ts`, name(12))": {sqls: []string{"alter table t add index i_idx (id, ts, `name`(12))"}},
"create unique index i_idx on t(id)": {sqls: []string{"alter table t add unique index i_idx (id)"}},
"create index i_idx using btree on t(id)": {sqls: []string{"alter table t add index i_idx using btree (id)"}},
"create index with syntax error i_idx on t(id)": {isError: true},
"select * from t": {isError: true},
"drop database t": {isError: true},
}
for query, expect := range tests {
normalized, err := NormalizeOnlineDDL(query)
Copy link
Collaborator

@systay systay Dec 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

for query, expect := range tests {
	t.Run(query, func(t *testing.T) {
...
	})
}

sub tests makes it look good in test output

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL actually

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if expect.isError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
sqls := []string{}
for _, n := range normalized {
sqls = append(sqls, n.SQL)
}
assert.Equal(t, expect.sqls, sqls)
}
}
}
18 changes: 8 additions & 10 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,22 +254,20 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute
return &execResult
}
isOnlineDDL, strategy, options := exec.isOnlineSchemaDDL(nil)
tableName := ""
switch ddl := stat.(type) {
case sqlparser.DDLStatement:
switch ddl.GetAction() {
case sqlparser.DropDDLAction:
// TODO (shlomi): break into distinct per-table DROP statements; on a future PR where
// we implement lazy DROP TABLE on Online DDL
tableName = ddl.GetFromTables()[0].Name.String()
default:
tableName = ddl.GetTable().Name.String()
}
isOnlineDDL, strategy, options = exec.isOnlineSchemaDDL(ddl)
}
exec.wr.Logger().Infof("Received DDL request. strategy=%+v", strategy)
if isOnlineDDL {
exec.executeOnlineDDL(ctx, &execResult, sql, tableName, strategy, options)
normalizedQueries, err := schema.NormalizeOnlineDDL(sql)
if err != nil {
execResult.ExecutorErr = err.Error()
return &execResult
}
for _, normalized := range normalizedQueries {
exec.executeOnlineDDL(ctx, &execResult, normalized.SQL, normalized.TableName.Name.String(), strategy, options)
}
} else {
exec.executeOnAllTablets(ctx, &execResult, sql)
}
Expand Down
28 changes: 22 additions & 6 deletions go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type (
GetVindexCols() []ColIdent
AffectedTables() TableNames
SetTable(qualifier string, name string)
SetFromTables(tables TableNames)
Statement
}

Expand Down Expand Up @@ -290,7 +291,6 @@ type (
CreateIndex struct {
Constraint string
Name ColIdent
IndexType string
Table TableName
Columns []*IndexColumn
Options []*IndexOption
Expand Down Expand Up @@ -530,6 +530,21 @@ func (node *CreateTable) GetFromTables() TableNames {
return nil
}

// SetFromTables implements DDLStatement.
func (node *DDL) SetFromTables(tables TableNames) {
node.FromTables = tables
}

// SetFromTables implements DDLStatement.
func (node *CreateIndex) SetFromTables(tables TableNames) {
// irrelevant
}

// SetFromTables implements DDLStatement.
func (node *CreateTable) SetFromTables(tables TableNames) {
// irrelevant
}

// GetToTables implements the DDLStatement interface
func (node *DDL) GetToTables() TableNames {
return node.ToTables
Expand Down Expand Up @@ -2470,15 +2485,13 @@ func (node *SelectInto) Format(buf *TrackedBuffer) {

// Format formats the node.
func (node *CreateIndex) Format(buf *TrackedBuffer) {
buf.WriteString("create")
buf.astPrintf(node, "alter table %v add", node.Table)
if node.Constraint != "" {
buf.WriteString(" " + node.Constraint)
}
buf.astPrintf(node, " index %v", node.Name)
if node.IndexType != "" {
buf.WriteString(" using " + node.IndexType)
}
buf.astPrintf(node, " on %v (", node.Table)

buf.WriteString(" (")
for i, col := range node.Columns {
if i != 0 {
buf.astPrintf(node, ", %v", col.Column)
Expand All @@ -2494,6 +2507,9 @@ func (node *CreateIndex) Format(buf *TrackedBuffer) {
}
buf.astPrintf(node, ")")
for _, opt := range node.Options {
//if opt == nil {
// continue
//}
buf.WriteString(" " + strings.ToLower(opt.Name))
if opt.String != "" {
buf.WriteString(" " + opt.String)
Expand Down
23 changes: 15 additions & 8 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,20 +1156,26 @@ var (
input: "alter vschema on a drop vindex `add`",
output: "alter vschema on a drop vindex `add`",
}, {
input: "create index a on b (col1)",
input: "create index a on b (col1)",
output: "alter table b add index a (col1)",
}, {
input: "create unique index a on b (col1)",
input: "create unique index a on b (col1)",
output: "alter table b add unique index a (col1)",
}, {
input: "create unique index a using foo on b (col1 desc)",
input: "create unique index a using foo on b (col1 desc)",
output: "alter table b add unique index a (col1 desc) using foo",
}, {
input: "create fulltext index a using foo on b (col1)",
input: "create fulltext index a on b (col1) with parser a",
output: "alter table b add fulltext index a (col1) with parser a",
}, {
input: "create spatial index a using foo on b (col1)",
input: "create spatial index a on b (col1)",
output: "alter table b add spatial index a (col1)",
}, {
input: "create index a on b (col1) using btree key_block_size 12 with parser 'a' comment 'string' algorithm inplace lock none",
input: "create fulltext index a on b (col1) key_block_size=12 with parser a comment 'string' algorithm inplace lock none",
output: "alter table b add fulltext index a (col1) key_block_size 12 with parser a comment 'string' algorithm inplace lock none",
}, {
input: "create index a on b ((col1 + col2), (col1*col2))",
output: "create index a on b ()",
output: "alter table b add index a ()",
partialDDL: true,
}, {
input: "create view a",
Expand Down Expand Up @@ -1883,7 +1889,8 @@ func TestCaseSensitivity(t *testing.T) {
input: "create table A (\n\t`B` int\n)",
output: "create table A (\n\tB int\n)",
}, {
input: "create index b on A (col1 desc)",
input: "create index b on A (col1 desc)",
output: "alter table A add index b (col1 desc)",
}, {
input: "alter table A foo",
output: "alter table A",
Expand Down
Loading