Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalizing Online-DDL queries #7153

Merged
merged 12 commits into from
Dec 16, 2020
25 changes: 16 additions & 9 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL
return onlineDDL, nil
}

// getOnlineDDLAction parses the given SQL into a statement and returns the action type of the DDL statement, or error
// if the statement is not a DDL
func getOnlineDDLAction(sql string) (action sqlparser.DDLAction, ddlStmt sqlparser.DDLStatement, err error) {
stmt, err := sqlparser.Parse(sql)
if err != nil {
return action, ddlStmt, fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", sql, err)
}
switch ddlStmt := stmt.(type) {
case sqlparser.DDLStatement:
return ddlStmt.GetAction(), ddlStmt, nil
}
return action, ddlStmt, fmt.Errorf("Unsupported query type: %s", sql)
}

// NewOnlineDDL creates a schema change request with self generated UUID and RequestTime
func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string, requestContext string) (*OnlineDDL, error) {
u, err := createUUID("_")
Expand Down Expand Up @@ -175,15 +189,8 @@ func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) {

// GetAction extracts the DDL action type from the online DDL statement
func (onlineDDL *OnlineDDL) GetAction() (action sqlparser.DDLAction, err error) {
stmt, err := sqlparser.Parse(onlineDDL.SQL)
if err != nil {
return action, fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", onlineDDL.SQL, err)
}
switch stmt := stmt.(type) {
case sqlparser.DDLStatement:
return stmt.GetAction(), nil
}
return action, fmt.Errorf("Unsupported query type: %s", onlineDDL.SQL)
action, _, err = getOnlineDDLAction(onlineDDL.SQL)
return action, err
}

// GetActionStr returns a string representation of the DDL action
Expand Down
30 changes: 30 additions & 0 deletions go/vt/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@ package schema
import (
"regexp"
"strings"

"vitess.io/vitess/go/vt/sqlparser"
)

// NormalizedDDLQuery contains a query which is online-ddl -normalized
type NormalizedDDLQuery struct {
SQL string
TableName sqlparser.TableName
}

var (
// ALTER TABLE
alterTableBasicPattern = `(?s)(?i)\balter\s+table\s+`
Expand Down Expand Up @@ -64,3 +72,25 @@ func ParseAlterTableOptions(alterStatement string) (explicitSchema, explicitTabl
}
return explicitSchema, explicitTable, alterOptions
}

// NormalizeOnlineDDL normalizes a given query for OnlineDDL, possibly exploding it into multiple distinct queries
func NormalizeOnlineDDL(sql string) (normalized []*NormalizedDDLQuery, err error) {
action, ddlStmt, err := getOnlineDDLAction(sql)
if err != nil {
return normalized, err
}
switch action {
case sqlparser.DropDDLAction:
tables := ddlStmt.GetFromTables()
for _, table := range tables {
ddlStmt.SetFromTables([]sqlparser.TableName{table})
normalized = append(normalized, &NormalizedDDLQuery{SQL: sqlparser.String(ddlStmt), TableName: table})
}
return normalized, nil
}
if ddlStmt.IsFullyParsed() {
sql = sqlparser.String(ddlStmt)
}
n := &NormalizedDDLQuery{SQL: sql, TableName: ddlStmt.GetTable()}
return []*NormalizedDDLQuery{n}, nil
}
56 changes: 47 additions & 9 deletions go/vt/schema/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package schema

import (
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseAlterTableOptions(t *testing.T) {
Expand All @@ -40,14 +43,49 @@ func TestParseAlterTableOptions(t *testing.T) {
}
for query, expect := range tests {
schema, table, options := ParseAlterTableOptions(query)
if schema != expect.schema {
t.Errorf("schema: %+v, want:%+v", schema, expect.schema)
}
if table != expect.table {
t.Errorf("table: %+v, want:%+v", table, expect.table)
}
if options != expect.options {
t.Errorf("options: %+v, want:%+v", options, expect.options)
}
assert.Equal(t, expect.schema, schema)
assert.Equal(t, expect.table, table)
assert.Equal(t, expect.options, options)
}
}

func TestNormalizeOnlineDDL(t *testing.T) {
type expect struct {
sqls []string
isError bool
}
tests := map[string]expect{
"alter table t add column i int, drop column d": {sqls: []string{"alter table t add column i int, drop column d"}},
"create table t (id int primary key)": {sqls: []string{"create table t (id int primary key)"}},
"drop table t": {sqls: []string{"drop table t"}},
"drop table if exists t": {sqls: []string{"drop table if exists t"}},
"drop table t1, t2, t3": {sqls: []string{"drop table t1", "drop table t2", "drop table t3"}},
"drop table if exists t1, t2, t3": {sqls: []string{"drop table if exists t1", "drop table if exists t2", "drop table if exists t3"}},
"create index i_idx on t(id)": {sqls: []string{"alter table t add index i_idx (id)"}},
"create index i_idx on t(name(12))": {sqls: []string{"alter table t add index i_idx (`name`(12))"}},
"create index i_idx on t(id, `ts`, name(12))": {sqls: []string{"alter table t add index i_idx (id, ts, `name`(12))"}},
"create unique index i_idx on t(id)": {sqls: []string{"alter table t add unique index i_idx (id)"}},
"create index i_idx using btree on t(id)": {sqls: []string{"alter table t add index i_idx (id) using btree"}},
"create index with syntax error i_idx on t(id)": {isError: true},
"select * from t": {isError: true},
"drop database t": {isError: true},
}
for query, expect := range tests {
t.Run(query, func(t *testing.T) {
normalized, err := NormalizeOnlineDDL(query)
if expect.isError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
sqls := []string{}
for _, n := range normalized {
sql := n.SQL
sql = strings.ReplaceAll(sql, "\n", "")
sql = strings.ReplaceAll(sql, "\t", "")
sqls = append(sqls, sql)
}
assert.Equal(t, expect.sqls, sqls)
}
})
}
}
50 changes: 27 additions & 23 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,31 @@ func (exec *TabletExecutor) preflightSchemaChanges(ctx context.Context, sqls []s
return err
}

// executeSQL executes a single SQL statement either as online DDL or synchronously on all tablets.
// In online DDL case, the query may be exploded into multiple queries during
func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, execResult *ExecuteResult) error {
stat, err := sqlparser.Parse(sql)
if err != nil {
return err
}
switch ddl := stat.(type) {
case sqlparser.DDLStatement:
if isOnlineDDL, strategy, options := exec.isOnlineSchemaDDL(ddl); isOnlineDDL {
exec.wr.Logger().Infof("Received online DDL request. strategy=%+v", strategy)
normalizedQueries, err := schema.NormalizeOnlineDDL(sql)
if err != nil {
return err
}
for _, normalized := range normalizedQueries {
exec.executeOnlineDDL(ctx, execResult, normalized.SQL, normalized.TableName.Name.String(), strategy, options)
}
return nil
}
}
exec.executeOnAllTablets(ctx, execResult, sql)
return nil
}

// Execute applies schema changes
func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *ExecuteResult {
execResult := ExecuteResult{}
Expand Down Expand Up @@ -247,32 +272,11 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute

for index, sql := range sqls {
execResult.CurSQLIndex = index

stat, err := sqlparser.Parse(sql)
if err != nil {
if err := exec.executeSQL(ctx, sql, &execResult); err != nil {
execResult.ExecutorErr = err.Error()
return &execResult
}
isOnlineDDL, strategy, options := exec.isOnlineSchemaDDL(nil)
tableName := ""
switch ddl := stat.(type) {
case sqlparser.DDLStatement:
switch ddl.GetAction() {
case sqlparser.DropDDLAction:
// TODO (shlomi): break into distinct per-table DROP statements; on a future PR where
// we implement lazy DROP TABLE on Online DDL
tableName = ddl.GetFromTables()[0].Name.String()
default:
tableName = ddl.GetTable().Name.String()
}
isOnlineDDL, strategy, options = exec.isOnlineSchemaDDL(ddl)
}
exec.wr.Logger().Infof("Received DDL request. strategy=%+v", strategy)
if isOnlineDDL {
exec.executeOnlineDDL(ctx, &execResult, sql, tableName, strategy, options)
} else {
exec.executeOnAllTablets(ctx, &execResult, sql)
}

if len(execResult.FailedShards) > 0 {
break
}
Expand Down
33 changes: 27 additions & 6 deletions go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type (
GetVindexCols() []ColIdent
AffectedTables() TableNames
SetTable(qualifier string, name string)
SetFromTables(tables TableNames)
Statement
}

Expand Down Expand Up @@ -290,7 +291,6 @@ type (
CreateIndex struct {
Constraint string
Name ColIdent
IndexType string
Table TableName
Columns []*IndexColumn
Options []*IndexOption
Expand Down Expand Up @@ -584,6 +584,26 @@ func (node *CreateView) GetFromTables() TableNames {
return nil
}

// SetFromTables implements DDLStatement.
func (node *DDL) SetFromTables(tables TableNames) {
node.FromTables = tables
}

// SetFromTables implements DDLStatement.
func (node *CreateIndex) SetFromTables(tables TableNames) {
// irrelevant
}

// SetFromTables implements DDLStatement.
func (node *CreateTable) SetFromTables(tables TableNames) {
// irrelevant
}

// SetFromTables implements DDLStatement.
func (node *CreateView) SetFromTables(tables TableNames) {
// irrelevant
}

// GetToTables implements the DDLStatement interface
func (node *DDL) GetToTables() TableNames {
return node.ToTables
Expand Down Expand Up @@ -2545,15 +2565,13 @@ func (node *SelectInto) Format(buf *TrackedBuffer) {

// Format formats the node.
func (node *CreateIndex) Format(buf *TrackedBuffer) {
buf.WriteString("create")
buf.astPrintf(node, "alter table %v add", node.Table)
if node.Constraint != "" {
buf.WriteString(" " + node.Constraint)
}
buf.astPrintf(node, " index %v", node.Name)
if node.IndexType != "" {
buf.WriteString(" using " + node.IndexType)
}
buf.astPrintf(node, " on %v (", node.Table)

buf.WriteString(" (")
for i, col := range node.Columns {
if i != 0 {
buf.astPrintf(node, ", %v", col.Column)
Expand All @@ -2569,6 +2587,9 @@ func (node *CreateIndex) Format(buf *TrackedBuffer) {
}
buf.astPrintf(node, ")")
for _, opt := range node.Options {
//if opt == nil {
// continue
//}
buf.WriteString(" " + strings.ToLower(opt.Name))
if opt.String != "" {
buf.WriteString(" " + opt.String)
Expand Down
23 changes: 15 additions & 8 deletions go/vt/sqlparser/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1159,20 +1159,26 @@ var (
input: "alter vschema on a drop vindex `add`",
output: "alter vschema on a drop vindex `add`",
}, {
input: "create index a on b (col1)",
input: "create index a on b (col1)",
output: "alter table b add index a (col1)",
}, {
input: "create unique index a on b (col1)",
input: "create unique index a on b (col1)",
output: "alter table b add unique index a (col1)",
}, {
input: "create unique index a using foo on b (col1 desc)",
input: "create unique index a using foo on b (col1 desc)",
output: "alter table b add unique index a (col1 desc) using foo",
}, {
input: "create fulltext index a using foo on b (col1)",
input: "create fulltext index a on b (col1) with parser a",
output: "alter table b add fulltext index a (col1) with parser a",
}, {
input: "create spatial index a using foo on b (col1)",
input: "create spatial index a on b (col1)",
output: "alter table b add spatial index a (col1)",
}, {
input: "create index a on b (col1) using btree key_block_size 12 with parser 'a' comment 'string' algorithm inplace lock none",
input: "create fulltext index a on b (col1) key_block_size=12 with parser a comment 'string' algorithm inplace lock none",
output: "alter table b add fulltext index a (col1) key_block_size 12 with parser a comment 'string' algorithm inplace lock none",
}, {
input: "create index a on b ((col1 + col2), (col1*col2))",
output: "create index a on b ()",
output: "alter table b add index a ()",
partialDDL: true,
}, {
input: "create algorithm = merge sql security definer view a as select * from e",
Expand Down Expand Up @@ -1899,7 +1905,8 @@ func TestCaseSensitivity(t *testing.T) {
input: "create table A (\n\t`B` int\n)",
output: "create table A (\n\tB int\n)",
}, {
input: "create index b on A (col1 desc)",
input: "create index b on A (col1 desc)",
output: "alter table A add index b (col1 desc)",
}, {
input: "alter table A foo",
output: "alter table A",
Expand Down
Loading