Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: ddl_strategy=direct #7172

Merged
merged 8 commits into from
Dec 22, 2020
24 changes: 10 additions & 14 deletions go/test/endtoend/onlineddl/onlineddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ var (
cell = "zone1"
schemaChangeDirectory = ""
totalTableCount = 4
ddlStrategyUnchanged = "-"
createTable = `
CREATE TABLE %s (
id bigint(20) NOT NULL,
Expand Down Expand Up @@ -166,10 +165,10 @@ func TestSchemaChange(t *testing.T) {
assert.Equal(t, 2, len(clusterInstance.Keyspaces[0].Shards))
testWithInitialSchema(t)
{
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyNormal), "vtctl", "non_online")
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online")
}
{
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, ddlStrategyUnchanged, "vtgate", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "gh-ost", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
Expand Down Expand Up @@ -251,18 +250,16 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
}
} else {
var err error
ddlStrategyArg := ""
if ddlStrategy != ddlStrategyUnchanged {
ddlStrategyArg = ddlStrategy
}
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, ddlStrategyArg)
uuid, err = clusterInstance.VtctlclientProcess.ApplySchemaWithOutput(keyspaceName, sqlQuery, ddlStrategy)
assert.NoError(t, err)
}
uuid = strings.TrimSpace(uuid)
fmt.Println("# Generated UUID (for debug purposes):")
fmt.Printf("<%s>\n", uuid)

if ddlStrategy != string(schema.DDLStrategyNormal) {
strategy, _, err := schema.ParseDDLStrategy(ddlStrategy)
assert.NoError(t, err)
if !strategy.IsDirect() {
time.Sleep(time.Second * 20)
}

Expand Down Expand Up @@ -379,11 +376,10 @@ func vtgateExec(t *testing.T, ddlStrategy string, query string, expectError stri
require.Nil(t, err)
defer conn.Close()

if ddlStrategy != ddlStrategyUnchanged {
setSession := fmt.Sprintf("set @@ddl_strategy='%s'", ddlStrategy)
_, err := conn.ExecuteFetch(setSession, 1000, true)
assert.NoError(t, err)
}
setSession := fmt.Sprintf("set @@ddl_strategy='%s'", ddlStrategy)
_, err = conn.ExecuteFetch(setSession, 1000, true)
assert.NoError(t, err)

qr, err := conn.ExecuteFetch(query, 1000, true)
if expectError == "" {
require.NoError(t, err)
Expand Down
20 changes: 16 additions & 4 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,24 @@ const (
type DDLStrategy string

const (
// DDLStrategyNormal means not an online-ddl migration. Just a normal MySQL ALTER TABLE
DDLStrategyNormal DDLStrategy = ""
// DDLStrategyDirect means not an online-ddl migration. Just a normal MySQL ALTER TABLE
DDLStrategyDirect DDLStrategy = "direct"
// DDLStrategyGhost requests gh-ost to run the migration
DDLStrategyGhost DDLStrategy = "gh-ost"
// DDLStrategyPTOSC requests pt-online-schema-change to run the migration
DDLStrategyPTOSC DDLStrategy = "pt-osc"
)

// IsDirect returns true if this strategy is a direct strategy
// A strategy is direct if it's not explciitly one of the online DDL strategies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit pick: explciitly

func (s DDLStrategy) IsDirect() bool {
switch s {
case DDLStrategyGhost, DDLStrategyPTOSC:
return false
}
return true
}

// OnlineDDL encapsulates the relevant information in an online schema change request
type OnlineDDL struct {
Keyspace string `json:"keyspace,omitempty"`
Expand All @@ -112,10 +122,12 @@ func ParseDDLStrategy(strategyVariable string) (strategy DDLStrategy, options st
}

switch strategy = DDLStrategy(strategyName); strategy {
case DDLStrategyGhost, DDLStrategyPTOSC, DDLStrategyNormal:
case "": // backwards compatiblity and to handle unspecified values
return DDLStrategyDirect, options, nil
case DDLStrategyGhost, DDLStrategyPTOSC, DDLStrategyDirect:
return strategy, options, nil
default:
return strategy, options, fmt.Errorf("Unknown online DDL strategy: '%v'", strategy)
return DDLStrategyDirect, options, fmt.Errorf("Unknown online DDL strategy: '%v'", strategy)
}
}

Expand Down
16 changes: 15 additions & 1 deletion go/vt/schema/online_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,27 @@ func TestCreateUUID(t *testing.T) {
assert.NoError(t, err)
}

func TestIsDirect(t *testing.T) {
assert.True(t, DDLStrategyDirect.IsDirect())
assert.False(t, DDLStrategyGhost.IsDirect())
assert.False(t, DDLStrategyPTOSC.IsDirect())
assert.True(t, DDLStrategy("").IsDirect())
assert.False(t, DDLStrategy("gh-ost").IsDirect())
assert.False(t, DDLStrategy("pt-osc").IsDirect())
assert.True(t, DDLStrategy("something").IsDirect())
}

func TestParseDDLStrategy(t *testing.T) {
tt := []struct {
strategyVariable string
strategy DDLStrategy
options string
err error
}{
{
strategyVariable: "direct",
strategy: DDLStrategyDirect,
},
{
strategyVariable: "gh-ost",
strategy: DDLStrategyGhost,
Expand All @@ -44,7 +58,7 @@ func TestParseDDLStrategy(t *testing.T) {
strategy: DDLStrategyPTOSC,
},
{
strategy: DDLStrategyNormal,
strategy: DDLStrategyDirect,
},
{
strategyVariable: "gh-ost --max-load=Threads_running=100 --allow-master",
Expand Down
26 changes: 13 additions & 13 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,16 @@ func (exec *TabletExecutor) parseDDLs(sqls []string) ([]sqlparser.DDLStatement,
return parsedDDLs, parsedDBDDLs, nil
}

// IsOnlineSchemaDDL returns true if the query is an online schema change DDL
func (exec *TabletExecutor) isOnlineSchemaDDL(ddl sqlparser.DDLStatement) (isOnline bool, strategy schema.DDLStrategy, options string) {
if ddl == nil {
// IsOnlineSchemaDDL returns true if we expect to run a online schema change DDL
func (exec *TabletExecutor) isOnlineSchemaDDL() (isOnline bool, strategy schema.DDLStrategy, options string) {
strategy, options, err := schema.ParseDDLStrategy(exec.ddlStrategy)
if err != nil {
return false, strategy, options
}
strategy, options, _ = schema.ParseDDLStrategy(exec.ddlStrategy)
if strategy != schema.DDLStrategyNormal {
return true, strategy, options
if strategy.IsDirect() {
return false, strategy, options
}
return false, strategy, options
return true, strategy, options
}

// a schema change that satisfies any following condition is considered
Expand All @@ -183,7 +183,7 @@ func (exec *TabletExecutor) detectBigSchemaChanges(ctx context.Context, parsedDD
tableWithCount[tableSchema.Name] = tableSchema.RowCount
}
for _, ddl := range parsedDDLs {
if isOnline, _, _ := exec.isOnlineSchemaDDL(ddl); isOnline {
if isOnline, _, _ := exec.isOnlineSchemaDDL(); isOnline {
// Since this is an online schema change, there is no need to worry about big changes
continue
}
Expand Down Expand Up @@ -218,10 +218,10 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, execResu
if err != nil {
return err
}
switch ddl := stat.(type) {
switch stat.(type) {
case sqlparser.DDLStatement:
if isOnlineDDL, strategy, options := exec.isOnlineSchemaDDL(ddl); isOnlineDDL {
exec.wr.Logger().Infof("Received online DDL request. strategy=%+v", strategy)
if isOnlineDDL, strategy, options := exec.isOnlineSchemaDDL(); isOnlineDDL {
exec.wr.Logger().Infof("Received DDL request. strategy=%+v", strategy)
normalizedQueries, err := schema.NormalizeOnlineDDL(sql)
if err != nil {
return err
Expand All @@ -232,6 +232,7 @@ func (exec *TabletExecutor) executeSQL(ctx context.Context, sql string, execResu
return nil
}
}
exec.wr.Logger().Infof("Received DDL request. strategy=%+v", schema.DDLStrategyDirect)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it is useful to log the sql too 🤷‍♂️

exec.executeOnAllTablets(ctx, execResult, sql)
return nil
}
Expand Down Expand Up @@ -276,7 +277,6 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute
execResult.ExecutorErr = err.Error()
return &execResult
}

if len(execResult.FailedShards) > 0 {
break
}
Expand All @@ -289,7 +289,7 @@ func (exec *TabletExecutor) executeOnlineDDL(
ctx context.Context, execResult *ExecuteResult, sql string,
tableName string, strategy schema.DDLStrategy, options string,
) {
if strategy == schema.DDLStrategyNormal {
if strategy.IsDirect() {
execResult.ExecutorErr = "Not an online DDL strategy"
return
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/schemamanager/tablet_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ func TestIsOnlineSchemaDDL(t *testing.T) {
stmt, err := sqlparser.Parse(ts.query)
assert.NoError(t, err)

ddl, ok := stmt.(sqlparser.DDLStatement)
_, ok := stmt.(sqlparser.DDLStatement)
assert.True(t, ok)

isOnlineDDL, strategy, options := e.isOnlineSchemaDDL(ddl)
isOnlineDDL, strategy, options := e.isOnlineSchemaDDL()
assert.Equal(t, ts.isOnlineDDL, isOnlineDDL)
if isOnlineDDL {
assert.Equal(t, ts.strategy, strategy)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2456,7 +2456,7 @@ func commandApplySchema(ctx context.Context, wr *wrangler.Wrangler, subFlags *fl
allowLongUnavailability := subFlags.Bool("allow_long_unavailability", false, "Allow large schema changes which incur a longer unavailability of the database.")
sql := subFlags.String("sql", "", "A list of semicolon-delimited SQL commands")
sqlFile := subFlags.String("sql-file", "", "Identifies the file that contains the SQL commands")
ddlStrategy := subFlags.String("ddl_strategy", "", "Online DDL strategy, compatible with @@ddl_strategy session variable (examples: 'gh-ost', 'pt-osc', 'gh-ost --max-load=Threads_running=100'")
ddlStrategy := subFlags.String("ddl_strategy", string(schema.DDLStrategyDirect), "Online DDL strategy, compatible with @@ddl_strategy session variable (examples: 'gh-ost', 'pt-osc', 'gh-ost --max-load=Threads_running=100'")
waitReplicasTimeout := subFlags.Duration("wait_replicas_timeout", wrangler.DefaultWaitReplicasTimeout, "The amount of time to wait for replicas to receive the schema change via replication.")
if err := subFlags.Parse(args); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (v *DDL) GetTableName() string {
func (v *DDL) isOnlineSchemaDDL() bool {
switch v.DDL.GetAction() {
case sqlparser.AlterDDLAction:
return v.OnlineDDL.Strategy != schema.DDLStrategyNormal
return !v.OnlineDDL.Strategy.IsDirect()
}
return false
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var (
_ = flag.Bool("disable_local_gateway", false, "deprecated: if specified, this process will not route any queries to local tablets in the local cell")
maxMemoryRows = flag.Int("max_memory_rows", 300000, "Maximum number of rows that will be held in memory for intermediate results as well as the final result.")
warnMemoryRows = flag.Int("warn_memory_rows", 30000, "Warning threshold for in-memory results. A row count higher than this amount will cause the VtGateWarnings.ResultsExceeded counter to be incremented.")
defaultDDLStrategy = flag.String("ddl_strategy", "", "Set default strategy for DDL statements. Override with @@ddl_strategy session variable")
defaultDDLStrategy = flag.String("ddl_strategy", string(schema.DDLStrategyDirect), "Set default strategy for DDL statements. Override with @@ddl_strategy session variable")

// TODO(deepthi): change these two vars to unexported and move to healthcheck.go when LegacyHealthcheck is removed

Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1525,6 +1525,11 @@ func (e *Executor) VExec(ctx context.Context, vx *vexec.TabletVExec) (qr *queryp
return sqltypes.ResultToProto3(result), nil
}

if err := e.initSchema(ctx); err != nil {
log.Error(err)
return nil, err
}

switch stmt := vx.Stmt.(type) {
case *sqlparser.Delete:
return nil, fmt.Errorf("DELETE statements not supported for this table. query=%s", vx.Query)
Expand Down