Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CREATE, DROP statements in ApplySchema and online DDL #7083

Merged
60 changes: 41 additions & 19 deletions go/test/endtoend/onlineddl/onlineddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,33 +48,41 @@ var (
ddlStrategyUnchanged = "-"
createTable = `
CREATE TABLE %s (
id BIGINT(20) not NULL,
msg varchar(64),
PRIMARY KEY (id)
id bigint(20) NOT NULL,
msg varchar(64),
PRIMARY KEY (id)
) ENGINE=InnoDB;`
// To verify non online-DDL behavior
alterTableNormalStatement = `
ALTER TABLE %s
ADD COLUMN non_online INT UNSIGNED NOT NULL`
ADD COLUMN non_online int UNSIGNED NOT NULL`
// A trivial statement which must succeed and does not change the schema
alterTableTrivialStatement = `
ALTER TABLE %s
ENGINE=InnoDB`
ENGINE=InnoDB`
// The following statement is valid
alterTableSuccessfulStatement = `
ALTER TABLE %s
MODIFY id BIGINT UNSIGNED NOT NULL,
ADD COLUMN ghost_col INT NOT NULL,
ADD INDEX idx_msg(msg)`
MODIFY id bigint UNSIGNED NOT NULL,
ADD COLUMN ghost_col int NOT NULL,
ADD INDEX idx_msg(msg)`
// The following statement will fail because gh-ost requires some shared unique key
alterTableFailedStatement = `
ALTER TABLE %s
DROP PRIMARY KEY,
DROP COLUMN ghost_col`
DROP PRIMARY KEY,
DROP COLUMN ghost_col`
// We will run this query with "gh-ost --max-load=Threads_running=1"
alterTableThrottlingStatement = `
ALTER TABLE %s
DROP COLUMN ghost_col`
DROP COLUMN ghost_col`
onlineDDLCreateTableStatement = `
CREATE TABLE %s (
id bigint NOT NULL,
online_ddl_create_col INT NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB;`
onlineDDLDropTableStatement = `
DROP TABLE %s`
)

func fullWordUUIDRegexp(uuid, searchWord string) *regexp.Regexp {
Expand Down Expand Up @@ -156,33 +164,45 @@ func TestSchemaChange(t *testing.T) {
assert.Equal(t, 2, len(clusterInstance.Keyspaces[0].Shards))
testWithInitialSchema(t)
{
_ = testAlterTable(t, alterTableNormalStatement, string(schema.DDLStrategyNormal), "vtctl", "non_online")
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyNormal), "vtctl", "non_online")
}
{
uuid := testAlterTable(t, alterTableSuccessfulStatement, ddlStrategyUnchanged, "vtgate", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, ddlStrategyUnchanged, "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
{
uuid := testAlterTable(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
{
uuid := testAlterTable(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusRunning)
checkCancelMigration(t, uuid, true)
time.Sleep(2 * time.Second)
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
}
{
uuid := testAlterTable(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col")
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
}
{
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtctl", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
{
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "gh-ost", "vtctl", "online_ddl_create_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
}

func testWithInitialSchema(t *testing.T) {
Expand All @@ -198,8 +218,8 @@ func testWithInitialSchema(t *testing.T) {
checkTables(t, totalTableCount)
}

// testAlterTable runs an online DDL, ALTER statement
func testAlterTable(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectColumn string) (uuid string) {
// testOnlineDDLStatement runs an online DDL, ALTER statement
func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy string, executeStrategy string, expectColumn string) (uuid string) {
tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3)
sqlQuery := fmt.Sprintf(alterStatement, tableName)
if executeStrategy == "vtgate" {
Expand All @@ -224,7 +244,9 @@ func testAlterTable(t *testing.T, alterStatement string, ddlStrategy string, exe
time.Sleep(time.Second * 20)
}

checkMigratedTable(t, tableName, expectColumn)
if expectColumn != "" {
checkMigratedTable(t, tableName, expectColumn)
}
return uuid
}

Expand Down
12 changes: 8 additions & 4 deletions go/vt/schemamanager/tablet_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@ func (exec *TabletExecutor) isOnlineSchemaDDL(ddl sqlparser.DDLStatement) (isOnl
if ddl == nil {
return false, strategy, options
}
if ddl.GetAction() != sqlparser.AlterDDLAction {
return false, strategy, options
}
strategy, options, _ = schema.ParseDDLStrategy(exec.ddlStrategy)
if strategy != schema.DDLStrategyNormal {
return true, strategy, options
Expand Down Expand Up @@ -260,7 +257,14 @@ func (exec *TabletExecutor) Execute(ctx context.Context, sqls []string) *Execute
tableName := ""
switch ddl := stat.(type) {
case sqlparser.DDLStatement:
tableName = ddl.GetTable().Name.String()
switch ddl.GetAction() {
case sqlparser.DropDDLAction:
// TODO (shlomi): break into distinct per-table DROP statements; on a future PR where
// we implement lazy DROP TABLE on Online DDL
tableName = ddl.GetFromTables()[0].Name.String()
default:
tableName = ddl.GetTable().Name.String()
}
isOnlineDDL, strategy, options = exec.isOnlineSchemaDDL(ddl)
}
exec.wr.Logger().Infof("Received DDL request. strategy=%+v", strategy)
Expand Down
3 changes: 2 additions & 1 deletion go/vt/schemamanager/tablet_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ func TestIsOnlineSchemaDDL(t *testing.T) {
{
query: "CREATE TABLE t(id int)",
ddlStrategy: "gh-ost",
isOnlineDDL: false,
isOnlineDDL: true,
strategy: schema.DDLStrategyGhost,
},
{
query: "ALTER TABLE t ADD COLUMN i INT",
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtctld/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re
req := struct {
Keyspace, SQL string
ReplicaTimeoutSeconds int
DDLStrategy string `json:"ddl_strategy,omitempty"`
}{}
if err := unmarshalRequest(r, &req); err != nil {
return fmt.Errorf("can't unmarshal request: %v", err)
Expand All @@ -635,6 +636,10 @@ func initAPI(ctx context.Context, ts *topo.Server, actions *ActionRepository, re
requestContext := fmt.Sprintf("vtctld/api:%s", apiCallUUID)
executor := schemamanager.NewTabletExecutor(requestContext, wr, time.Duration(req.ReplicaTimeoutSeconds)*time.Second)

if err := executor.SetDDLStrategy(req.DDLStrategy); err != nil {
return fmt.Errorf("error setting DDL strategy: %v", err)
}

return schemamanager.Run(ctx,
schemamanager.NewUIController(req.SQL, req.Keyspace, w), executor)
})
Expand Down
Loading