Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing online ddl query hint from ALTER TABLE #7069

Merged
merged 11 commits into from
Nov 25, 2020
9 changes: 5 additions & 4 deletions go/test/endtoend/onlineddl/onlineddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ var (
ALTER TABLE %s
DROP PRIMARY KEY,
DROP COLUMN ghost_col`
// We will run this query with "gh-ost --max-load=Threads_running=1"
alterTableThrottlingStatement = `
ALTER WITH 'gh-ost' '--max-load=Threads_running=1' TABLE %s
ALTER TABLE %s
DROP COLUMN ghost_col`
)

Expand Down Expand Up @@ -164,20 +165,20 @@ func TestSchemaChange(t *testing.T) {
checkRetryMigration(t, uuid, false)
}
{
uuid := testAlterTable(t, alterTableTrivialStatement, string(schema.DDLStrategyGhost), "vtctl", "ghost_col")
uuid := testAlterTable(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
{
uuid := testAlterTable(t, alterTableThrottlingStatement, string(schema.DDLStrategyGhost), "vtgate", "ghost_col")
uuid := testAlterTable(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusRunning)
checkCancelMigration(t, uuid, true)
time.Sleep(2 * time.Second)
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
}
{
uuid := testAlterTable(t, alterTableFailedStatement, string(schema.DDLStrategyGhost), "vtgate", "ghost_col")
uuid := testAlterTable(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
Expand Down
38 changes: 20 additions & 18 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/google/uuid"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
)

Expand Down Expand Up @@ -76,39 +75,42 @@ const (
OnlineDDLStatusFailed OnlineDDLStatus = "failed"
)

// DDLStrategy suggests how an ALTER TABLE should run (e.g. "" for normal, "gh-ost" or "pt-osc")
type DDLStrategy string

const (
// DDLStrategyNormal means not an online-ddl migration. Just a normal MySQL ALTER TABLE
DDLStrategyNormal sqlparser.DDLStrategy = ""
DDLStrategyNormal DDLStrategy = ""
// DDLStrategyGhost requests gh-ost to run the migration
DDLStrategyGhost sqlparser.DDLStrategy = "gh-ost"
DDLStrategyGhost DDLStrategy = "gh-ost"
// DDLStrategyPTOSC requests pt-online-schema-change to run the migration
DDLStrategyPTOSC sqlparser.DDLStrategy = "pt-osc"
DDLStrategyPTOSC DDLStrategy = "pt-osc"
)

// OnlineDDL encapsulates the relevant information in an online schema change request
type OnlineDDL struct {
Keyspace string `json:"keyspace,omitempty"`
Table string `json:"table,omitempty"`
Schema string `json:"schema,omitempty"`
SQL string `json:"sql,omitempty"`
UUID string `json:"uuid,omitempty"`
Strategy sqlparser.DDLStrategy `json:"strategy,omitempty"`
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
Keyspace string `json:"keyspace,omitempty"`
Table string `json:"table,omitempty"`
Schema string `json:"schema,omitempty"`
SQL string `json:"sql,omitempty"`
UUID string `json:"uuid,omitempty"`
Strategy DDLStrategy `json:"strategy,omitempty"`
Options string `json:"options,omitempty"`
RequestTime int64 `json:"time_created,omitempty"`
Status OnlineDDLStatus `json:"status,omitempty"`
TabletAlias string `json:"tablet,omitempty"`
Retries int64 `json:"retries,omitempty"`
}

// ParseDDLStrategy validates the given ddl_strategy variable value , and parses the strategy and options parts.
func ParseDDLStrategy(strategyVariable string) (strategy sqlparser.DDLStrategy, options string, err error) {
func ParseDDLStrategy(strategyVariable string) (strategy DDLStrategy, options string, err error) {
strategyName := strategyVariable
if submatch := strategyParserRegexp.FindStringSubmatch(strategyVariable); len(submatch) > 0 {
strategyName = submatch[1]
options = submatch[2]
}

switch strategy = sqlparser.DDLStrategy(strategyName); strategy {
switch strategy = DDLStrategy(strategyName); strategy {
case DDLStrategyGhost, DDLStrategyPTOSC, DDLStrategyNormal:
return strategy, options, nil
default:
Expand Down Expand Up @@ -137,7 +139,7 @@ func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL
}

// NewOnlineDDL creates a schema change request with self generated UUID and RequestTime
func NewOnlineDDL(keyspace string, table string, sql string, strategy sqlparser.DDLStrategy, options string) (*OnlineDDL, error) {
func NewOnlineDDL(keyspace string, table string, sql string, strategy DDLStrategy, options string) (*OnlineDDL, error) {
u, err := CreateUUID()
if err != nil {
return nil, err
Expand Down
4 changes: 1 addition & 3 deletions go/vt/schema/online_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package schema
import (
"testing"

"vitess.io/vitess/go/vt/sqlparser"

"github.com/stretchr/testify/assert"
)

Expand All @@ -32,7 +30,7 @@ func TestCreateUUID(t *testing.T) {
func TestParseDDLStrategy(t *testing.T) {
tt := []struct {
strategyVariable string
strategy sqlparser.DDLStrategy
strategy DDLStrategy
options string
err error
}{
Expand Down
33 changes: 6 additions & 27 deletions go/vt/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,15 @@ limitations under the License.
package schema

import (
"fmt"
"regexp"
"strings"
)

var (
// ALTER TABLE
// ALTER WITH_GHOST TABLE
// ALTER WITH_GHOST LAG_'--max-lag-millis=2.5 --throttle-http=...' TABLE
// ALTER WITH_PT TABLE
alterTableBasicPattern = `(?s)(?i)\balter\s+(with\s+|\s+|).*?table\s+`
alterTableBasicPattern = `(?s)(?i)\balter\s+table\s+`
alterTableExplicitSchemaTableRegexps = []*regexp.Regexp{
// ALTER TABLE `scm`.`tbl` something
// ALTER WITH_GHOST TABLE `scm`.`tbl` something
// ALTER WITH_PT TABLE `scm`.`tbl` something
regexp.MustCompile(alterTableBasicPattern + "`" + `([^` + "`" + `]+)` + "`" + `[.]` + "`" + `([^` + "`" + `]+)` + "`" + `\s+(.*$)`),
// ALTER TABLE `scm`.tbl something
regexp.MustCompile(alterTableBasicPattern + "`" + `([^` + "`" + `]+)` + "`" + `[.]([\S]+)\s+(.*$)`),
Expand All @@ -55,33 +49,18 @@ func ParseAlterTableOptions(alterStatement string) (explicitSchema, explicitTabl
alterOptions = strings.TrimSpace(alterStatement)
for _, alterTableRegexp := range alterTableExplicitSchemaTableRegexps {
if submatch := alterTableRegexp.FindStringSubmatch(alterOptions); len(submatch) > 0 {
explicitSchema = submatch[2]
explicitTable = submatch[3]
alterOptions = submatch[4]
explicitSchema = submatch[1]
explicitTable = submatch[2]
alterOptions = submatch[3]
return explicitSchema, explicitTable, alterOptions
}
}
for _, alterTableRegexp := range alterTableExplicitTableRegexps {
if submatch := alterTableRegexp.FindStringSubmatch(alterOptions); len(submatch) > 0 {
explicitTable = submatch[2]
alterOptions = submatch[3]
explicitTable = submatch[1]
alterOptions = submatch[2]
return explicitSchema, explicitTable, alterOptions
}
}
return explicitSchema, explicitTable, alterOptions
}

// RemoveOnlineDDLHints removes a WITH_GHOST or WITH_PT hint, which is vitess-specific,
// from an ALTER TABLE statement
// e.g "ALTER WITH 'gh-ost' TABLE my_table DROP COLUMN i" -> "ALTER TABLE `my_table` DROP COLUMN i"
func RemoveOnlineDDLHints(alterStatement string) (normalizedAlterStatement string) {
explicitSchema, explicitTable, alterOptions := ParseAlterTableOptions(alterStatement)

if explicitTable == "" {
return alterOptions
}
if explicitSchema == "" {
return fmt.Sprintf("ALTER TABLE `%s` %s", explicitTable, alterOptions)
}
return fmt.Sprintf("ALTER TABLE `%s`.`%s` %s", explicitSchema, explicitTable, alterOptions)
}
59 changes: 12 additions & 47 deletions go/vt/schema/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,18 @@ func TestParseAlterTableOptions(t *testing.T) {
schema, table, options string
}
tests := map[string]expect{
"add column i int, drop column d": {schema: "", table: "", options: "add column i int, drop column d"},
" add column i int, drop column d ": {schema: "", table: "", options: "add column i int, drop column d"},
"alter table t add column i int, drop column d": {schema: "", table: "t", options: "add column i int, drop column d"},
"alter table t add column i int, drop column d": {schema: "", table: "t", options: "add column i int, drop column d"},
"alter table `t` add column i int, drop column d": {schema: "", table: "t", options: "add column i int, drop column d"},
"alter table `scm`.`t` add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter table `scm`.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter table scm.`t` add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter table scm.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter with 'gh-ost' table scm.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
" alter with 'gh-ost' table scm.`t` add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter with 'pt-osc' table scm.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter with 'gh-ost' '--some-option=5 --another-option=false' table scm.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter with 'gh-ost' '--initially-drop-old-table' table scm.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter with 'gh-ost' '--initially-drop-old-table --execute' table scm.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"ALTER WITH 'gh-ost' TABLE scm.t ADD COLUMN i int, DROP COLUMN d": {schema: "scm", table: "t", options: "ADD COLUMN i int, DROP COLUMN d"},
"add column i int, drop column d": {schema: "", table: "", options: "add column i int, drop column d"},
" add column i int, drop column d ": {schema: "", table: "", options: "add column i int, drop column d"},
"alter table t add column i int, drop column d": {schema: "", table: "t", options: "add column i int, drop column d"},
"alter table t add column i int, drop column d": {schema: "", table: "t", options: "add column i int, drop column d"},
"alter table `t` add column i int, drop column d": {schema: "", table: "t", options: "add column i int, drop column d"},
"alter table `scm`.`t` add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter table `scm`.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter table scm.`t` add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"alter table scm.t add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
" alter table scm.`t` add column i int, drop column d": {schema: "scm", table: "t", options: "add column i int, drop column d"},
"ALTER table scm.t ADD COLUMN i int, DROP COLUMN d": {schema: "scm", table: "t", options: "ADD COLUMN i int, DROP COLUMN d"},
"ALTER TABLE scm.t ADD COLUMN i int, DROP COLUMN d": {schema: "scm", table: "t", options: "ADD COLUMN i int, DROP COLUMN d"},
}
for query, expect := range tests {
schema, table, options := ParseAlterTableOptions(query)
Expand All @@ -55,34 +51,3 @@ func TestParseAlterTableOptions(t *testing.T) {
}
}
}

func TestRemoveOnlineDDLHints(t *testing.T) {
tests := map[string]string{
"ALTER TABLE my_table DROP COLUMN i": "ALTER TABLE `my_table` DROP COLUMN i",
" ALTER TABLE my_table DROP COLUMN i": "ALTER TABLE `my_table` DROP COLUMN i",
"ALTER WITH 'gh-ost' TABLE my_table DROP COLUMN i": "ALTER TABLE `my_table` DROP COLUMN i",
"ALTER WITH 'pt-osc' TABLE `my_table` DROP COLUMN i": "ALTER TABLE `my_table` DROP COLUMN i",
"ALTER WITH 'pt-osc' TABLE scm.`my_table` DROP COLUMN i": "ALTER TABLE `scm`.`my_table` DROP COLUMN i",
"ALTER WITH 'pt-osc' TABLE `scm`.`my_table` DROP COLUMN i": "ALTER TABLE `scm`.`my_table` DROP COLUMN i",
"ALTER WITH 'gh-ost' TABLE `scm`.`my_table` DROP COLUMN i": "ALTER TABLE `scm`.`my_table` DROP COLUMN i",
`
ALTER WITH 'gh-ost'
TABLE scm.my_table
DROP COLUMN i
`: "ALTER TABLE `scm`.`my_table` DROP COLUMN i",
`
ALTER
WITH
'gh-ost'
TABLE scm.my_table DROP COLUMN i,
ADD j INT
`: "ALTER TABLE `scm`.`my_table` DROP COLUMN i," + `
ADD j INT`,
}
for query, expect := range tests {
normalizedQuery := RemoveOnlineDDLHints(query)
if normalizedQuery != expect {
t.Errorf("got: %+v, want:%+v", normalizedQuery, expect)
}
}
}
79 changes: 39 additions & 40 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,17 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) ([]*sqlparser.DDL, []*sqlpa
}

// IsOnlineSchemaDDL returns true if the query is an online schema change DDL
func (exec *TabletExecutor) isOnlineSchemaDDL(ddl *sqlparser.DDL) (isOnline bool, strategy sqlparser.DDLStrategy, options string) {
func (exec *TabletExecutor) isOnlineSchemaDDL(ddl *sqlparser.DDL) (isOnline bool, strategy schema.DDLStrategy, options string) {
if ddl == nil {
return false, strategy, options
}
if ddl.Action != sqlparser.AlterDDLAction {
return false, strategy, options
}
strategy, options, _ = schema.ParseDDLStrategy(exec.ddlStrategy)
if strategy != schema.DDLStrategyNormal {
return true, strategy, options
}
if ddl.OnlineHint != nil {
return ddl.OnlineHint.Strategy != schema.DDLStrategyNormal, ddl.OnlineHint.Strategy, ddl.OnlineHint.Options
}
return false, strategy, options
}

Expand Down Expand Up @@ -240,19 +240,8 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute
}
}()

// We added the WITH_GHOST and WITH_PT hints to ALTER TABLE syntax, but these hints are
// obviously not accepted by MySQL.
// To run preflightSchemaChanges we must clean up such hints from the ALTER TABLE statement.
// Because our sqlparser does not do a complete parse of ALTER TABLE statements at this time,
// we resort to temporary regexp based parsing.
// TODO(shlomi): replace the below with sqlparser based reconstruction of the query,
// when sqlparser has a complete coverage of ALTER TABLE syntax
sqlsWithoutAlterTableHints := []string{}
for _, sql := range sqls {
sqlsWithoutAlterTableHints = append(sqlsWithoutAlterTableHints, schema.RemoveOnlineDDLHints(sql))
}
// Make sure the schema changes introduce a table definition change.
if err := exec.preflightSchemaChanges(ctx, sqlsWithoutAlterTableHints); err != nil {
if err := exec.preflightSchemaChanges(ctx, sqls); err != nil {
execResult.ExecutorErr = err.Error()
return &execResult
}
Expand All @@ -265,47 +254,57 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute
execResult.ExecutorErr = err.Error()
return &execResult
}
strategy := schema.DDLStrategyNormal
options := ""
isOnlineDDL, strategy, options := exec.isOnlineSchemaDDL(nil)
tableName := ""
switch ddl := stat.(type) {
case *sqlparser.DDL:
tableName = ddl.Table.Name.String()
_, strategy, options = exec.isOnlineSchemaDDL(ddl)
isOnlineDDL, strategy, options = exec.isOnlineSchemaDDL(ddl)
}
exec.wr.Logger().Infof("Received DDL request. strategy=%+v", strategy)
if isOnlineDDL {
exec.executeOnlineDDL(ctx, &execResult, sql, tableName, strategy, options)
} else {
exec.executeOnAllTablets(ctx, &execResult, sql)
}
exec.wr.Logger().Infof("Received DDL request. strategy = %+v", strategy)
exec.executeOnAllTablets(ctx, &execResult, sql, tableName, strategy, options)
if len(execResult.FailedShards) > 0 {
break
}
}
return &execResult
}

func (exec *TabletExecutor) executeOnAllTablets(
// executeOnlineDDL submits an online DDL request; this runs on topo, not on tablets, and is a quick operation.
func (exec *TabletExecutor) executeOnlineDDL(
ctx context.Context, execResult *ExecuteResult, sql string,
tableName string, strategy sqlparser.DDLStrategy, options string,
tableName string, strategy schema.DDLStrategy, options string,
) {
if strategy != schema.DDLStrategyNormal {
onlineDDL, err := schema.NewOnlineDDL(exec.keyspace, tableName, sql, strategy, options)
if err != nil {
execResult.ExecutorErr = err.Error()
return
}
conn, err := exec.wr.TopoServer().ConnForCell(ctx, topo.GlobalCell)
if err != nil {
execResult.ExecutorErr = fmt.Sprintf("online DDL ConnForCell error:%s", err.Error())
return
}
err = onlineDDL.WriteTopo(ctx, conn, schema.MigrationRequestsPath())
if err != nil {
execResult.ExecutorErr = err.Error()
}
exec.wr.Logger().Infof("UUID=%+v", onlineDDL.UUID)
exec.wr.Logger().Printf("%s\n", onlineDDL.UUID)
if strategy == schema.DDLStrategyNormal {
execResult.ExecutorErr = "Not an online DDL strategy"
return
}
onlineDDL, err := schema.NewOnlineDDL(exec.keyspace, tableName, sql, strategy, options)
if err != nil {
execResult.ExecutorErr = err.Error()
return
}
conn, err := exec.wr.TopoServer().ConnForCell(ctx, topo.GlobalCell)
if err != nil {
execResult.ExecutorErr = fmt.Sprintf("online DDL ConnForCell error:%s", err.Error())
return
}
err = onlineDDL.WriteTopo(ctx, conn, schema.MigrationRequestsPath())
if err != nil {
execResult.ExecutorErr = err.Error()
}
exec.wr.Logger().Infof("UUID=%+v", onlineDDL.UUID)
exec.wr.Logger().Printf("%s\n", onlineDDL.UUID)
}

// executeOnAllTablets runs a query on all tablets, synchronously. This can be a long running operation.
func (exec *TabletExecutor) executeOnAllTablets(
ctx context.Context, execResult *ExecuteResult, sql string,
) {
var wg sync.WaitGroup
numOfMasterTablets := len(exec.tablets)
wg.Add(numOfMasterTablets)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/schemamanager/tablet_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestIsOnlineSchemaDDL(t *testing.T) {
query string
ddlStrategy string
isOnlineDDL bool
strategy sqlparser.DDLStrategy
strategy schema.DDLStrategy
options string
}{
{
Expand Down
Loading