Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: DROP TABLE translated to RENAME TABLE statement #7221

Merged
merged 17 commits into from
Jan 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/mysql/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ const (
ERServerShutdown = 1053

// not found
ERCantFindFile = 1017
ERFormNotFound = 1029
ERKeyNotFound = 1032
ERBadFieldError = 1054
Expand Down
73 changes: 49 additions & 24 deletions go/test/endtoend/onlineddl/onlineddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ var (
) ENGINE=InnoDB;`
onlineDDLDropTableStatement = `
DROP TABLE %s`
onlineDDLDropTableIfExistsStatement = `
DROP TABLE IF EXISTS %s`
)

func fullWordUUIDRegexp(uuid, searchWord string) *regexp.Regexp {
Expand Down Expand Up @@ -164,41 +166,41 @@ func TestSchemaChange(t *testing.T) {
defer cluster.PanicHandler(t)
assert.Equal(t, 2, len(clusterInstance.Keyspaces[0].Shards))
testWithInitialSchema(t)
{
t.Run("create non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online")
}
{
})
t.Run("successful online alter, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "gh-ost", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
{
})
t.Run("successful online alter, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
{
})
t.Run("throttled migration", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusRunning)
checkCancelMigration(t, uuid, true)
time.Sleep(2 * time.Second)
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
}
{
})
t.Run("failed migration", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
// migration will fail again
}
{
})
t.Run("cancel all migrations: nothing to cancel", func(t *testing.T) {
// no migrations pending at this time
time.Sleep(10 * time.Second)
checkCancelAllMigrations(t, 0)
}
{
})
t.Run("cancel all migrations: some migrations to cancel", func(t *testing.T) {
// spawn n migrations; cancel them via cancel-all
var wg sync.WaitGroup
count := 4
Expand All @@ -211,19 +213,41 @@ func TestSchemaChange(t *testing.T) {
}
wg.Wait()
checkCancelAllMigrations(t, count)
}
{
})
t.Run("Online DROP, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtctl", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
{
})
t.Run("Online CREATE, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "gh-ost", "vtctl", "online_ddl_create_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
}
})
t.Run("Online DROP TABLE IF EXISTS, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "gh-ost", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
// this table existed
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 1)
})
t.Run("Online DROP TABLE IF EXISTS for nonexistent table, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "gh-ost", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
// this table did not exist
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 0)
})
t.Run("Online DROP TABLE for nonexistent table, expect error, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
})
}

func testWithInitialSchema(t *testing.T) {
Expand All @@ -236,7 +260,7 @@ func testWithInitialSchema(t *testing.T) {
}

// Check if 4 tables are created
checkTables(t, totalTableCount)
checkTables(t, "", totalTableCount)
}

// testOnlineDDLStatement runs an online DDL, ALTER statement
Expand Down Expand Up @@ -270,17 +294,18 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
}

// checkTables checks the number of tables in the first two shards.
func checkTables(t *testing.T, count int) {
func checkTables(t *testing.T, showTableName string, expectCount int) {
for i := range clusterInstance.Keyspaces[0].Shards {
checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], count)
checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[i].Vttablets[0], showTableName, expectCount)
}
}

// checkTablesCount checks the number of tables in the given tablet
func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, count int) {
queryResult, err := tablet.VttabletProcess.QueryTablet("show tables;", keyspaceName, true)
func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName string, expectCount int) {
query := fmt.Sprintf(`show tables like '%%%s%%';`, showTableName)
queryResult, err := tablet.VttabletProcess.QueryTablet(query, keyspaceName, true)
require.Nil(t, err)
assert.Equal(t, len(queryResult.Rows), count)
assert.Equal(t, expectCount, len(queryResult.Rows))
}

// checkRecentMigrations checks 'OnlineDDL <keyspace> show recent' output. Example to such output:
Expand Down
23 changes: 17 additions & 6 deletions go/vt/schema/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"fmt"
"regexp"
"strings"
"time"

"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -151,18 +152,18 @@ func ReadTopo(ctx context.Context, conn topo.Conn, entryPath string) (*OnlineDDL
return onlineDDL, nil
}

// getOnlineDDLAction parses the given SQL into a statement and returns the action type of the DDL statement, or error
// ParseOnlineDDLStatement parses the given SQL into a statement and returns the action type of the DDL statement, or error
// if the statement is not a DDL
func getOnlineDDLAction(sql string) (action sqlparser.DDLAction, ddlStmt sqlparser.DDLStatement, err error) {
func ParseOnlineDDLStatement(sql string) (ddlStmt sqlparser.DDLStatement, action sqlparser.DDLAction, err error) {
stmt, err := sqlparser.Parse(sql)
if err != nil {
return action, ddlStmt, fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", sql, err)
return nil, 0, fmt.Errorf("Error parsing statement: SQL=%s, error=%+v", sql, err)
}
switch ddlStmt := stmt.(type) {
case sqlparser.DDLStatement:
return ddlStmt.GetAction(), ddlStmt, nil
return ddlStmt, ddlStmt.GetAction(), nil
}
return action, ddlStmt, fmt.Errorf("Unsupported query type: %s", sql)
return ddlStmt, action, fmt.Errorf("Unsupported query type: %s", sql)
}

// NewOnlineDDL creates a schema change request with self generated UUID and RequestTime
Expand Down Expand Up @@ -201,7 +202,7 @@ func (onlineDDL *OnlineDDL) ToJSON() ([]byte, error) {

// GetAction extracts the DDL action type from the online DDL statement
func (onlineDDL *OnlineDDL) GetAction() (action sqlparser.DDLAction, err error) {
action, _, err = getOnlineDDLAction(onlineDDL.SQL)
_, action, err = ParseOnlineDDLStatement(onlineDDL.SQL)
return action, err
}

Expand Down Expand Up @@ -243,12 +244,22 @@ func (onlineDDL *OnlineDDL) WriteTopo(ctx context.Context, conn topo.Conn, baseP
return nil
}

// GetGCUUID gets this OnlineDDL UUID in GC UUID format
func (onlineDDL *OnlineDDL) GetGCUUID() string {
return OnlineDDLToGCUUID(onlineDDL.UUID)
}

// IsOnlineDDLUUID answers 'true' when the given string is an online-ddl UUID, e.g.:
// a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a
func IsOnlineDDLUUID(uuid string) bool {
return onlineDdlUUIDRegexp.MatchString(uuid)
}

// OnlineDDLToGCUUID converts a UUID in online-ddl format to GC-table format
func OnlineDDLToGCUUID(uuid string) string {
return strings.Replace(uuid, "_", "", -1)
}

// IsOnlineDDLTableName answers 'true' when the given table name _appears to be_ a name
// generated by an online DDL operation; either the name determined by the online DDL Executor, or
// by pt-online-schema-change.
Expand Down
13 changes: 13 additions & 0 deletions go/vt/schema/online_ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ func TestIsOnlineDDLUUID(t *testing.T) {
}
}

func TestGetGCUUID(t *testing.T) {
uuids := map[string]bool{}
count := 20
for i := 0; i < count; i++ {
onlineDDL, err := NewOnlineDDL("ks", "tbl", "alter table t drop column c", DDLStrategyDirect, "", "")
assert.NoError(t, err)
gcUUID := onlineDDL.GetGCUUID()
assert.True(t, IsGCUUID(gcUUID))
uuids[gcUUID] = true
}
assert.Equal(t, count, len(uuids))
}

func TestGetActionStr(t *testing.T) {
tt := []struct {
statement string
Expand Down
2 changes: 1 addition & 1 deletion go/vt/schema/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func ParseAlterTableOptions(alterStatement string) (explicitSchema, explicitTabl

// NormalizeOnlineDDL normalizes a given query for OnlineDDL, possibly exploding it into multiple distinct queries
func NormalizeOnlineDDL(sql string) (normalized []*NormalizedDDLQuery, err error) {
action, ddlStmt, err := getOnlineDDLAction(sql)
ddlStmt, action, err := ParseOnlineDDLStatement(sql)
if err != nil {
return normalized, err
}
Expand Down
41 changes: 30 additions & 11 deletions go/vt/schema/tablegc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ const (
)

var (
gcTableNameRegexp = regexp.MustCompile(`^_vt_(HOLD|PURGE|EVAC|DROP)_[0-f]{32}_([0-9]{14})$`)
gcUUIDRegexp = regexp.MustCompile(`^[0-f]{32}$`)
gcTableNameRegexp = regexp.MustCompile(`^_vt_(HOLD|PURGE|EVAC|DROP)_([0-f]{32})_([0-9]{14})$`)

gcStates = map[string]TableGCState{
string(HoldTableGCState): HoldTableGCState,
Expand All @@ -53,40 +54,58 @@ var (
}
)

func generateGCTableName(state TableGCState, t time.Time) (string, error) {
uuid, err := createUUID("")
// IsGCUUID answers 'true' when the given string is an GC UUID, e.g.:
// a0638f6bec7b11ea9bf8000d3a9b8a9a
func IsGCUUID(uuid string) bool {
return gcUUIDRegexp.MatchString(uuid)
}

// generateGCTableName creates a GC table name, based on desired state and time, and with optional preset UUID.
// If uuid is given, then it must be in GC-UUID format. If empty, the function auto-generates a UUID.
func generateGCTableName(state TableGCState, uuid string, t time.Time) (tableName string, err error) {
if uuid == "" {
uuid, err = createUUID("")
}
if err != nil {
return "", err
}
if !IsGCUUID(uuid) {
return "", fmt.Errorf("Not a valid GC UUID format: %s", uuid)
}
timestamp := ToReadableTimestamp(t)
return fmt.Sprintf("_vt_%s_%s_%s", state, uuid, timestamp), nil
}

// AnalyzeGCTableName analyzes a given table name to see if it's a GC table, and if so, parse out
// its state and timestamp
func AnalyzeGCTableName(tableName string) (isGCTable bool, state TableGCState, t time.Time, err error) {
// its state, uuid, and timestamp
func AnalyzeGCTableName(tableName string) (isGCTable bool, state TableGCState, uuid string, t time.Time, err error) {
submatch := gcTableNameRegexp.FindStringSubmatch(tableName)
if len(submatch) == 0 {
return false, state, t, nil
return false, state, uuid, t, nil
}
t, err = time.Parse(readableTimeFormat, submatch[2])
return true, TableGCState(submatch[1]), t, err
t, err = time.Parse(readableTimeFormat, submatch[3])
return true, TableGCState(submatch[1]), submatch[2], t, err
}

// IsGCTableName answers 'true' when the given table name stands for a GC table
func IsGCTableName(tableName string) bool {
return gcTableNameRegexp.MatchString(tableName)
}

// GenerateRenameStatement generates a "RENAME TABLE" statement, where a table is renamed to a GC table.
func GenerateRenameStatement(fromTableName string, state TableGCState, t time.Time) (statement string, toTableName string, err error) {
toTableName, err = generateGCTableName(state, t)
// GenerateRenameStatementWithUUID generates a "RENAME TABLE" statement, where a table is renamed to a GC table, with preset UUID
func GenerateRenameStatementWithUUID(fromTableName string, state TableGCState, uuid string, t time.Time) (statement string, toTableName string, err error) {
toTableName, err = generateGCTableName(state, uuid, t)
if err != nil {
return "", "", err
}
return fmt.Sprintf("RENAME TABLE `%s` TO %s", fromTableName, toTableName), toTableName, nil
}

// GenerateRenameStatement generates a "RENAME TABLE" statement, where a table is renamed to a GC table.
func GenerateRenameStatement(fromTableName string, state TableGCState, t time.Time) (statement string, toTableName string, err error) {
return GenerateRenameStatementWithUUID(fromTableName, state, "", t)
}

// ParseGCLifecycle parses a comma separated list of gc states and returns a map of indicated states
func ParseGCLifecycle(gcLifecycle string) (states map[TableGCState]bool, err error) {
states = make(map[TableGCState]bool)
Expand Down
21 changes: 12 additions & 9 deletions go/vt/schema/tablegc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestIsGCTableName(t *testing.T) {
states := []TableGCState{HoldTableGCState, PurgeTableGCState, EvacTableGCState, DropTableGCState}
for _, state := range states {
for i := 0; i < 10; i++ {
tableName, err := generateGCTableName(state, tm)
tableName, err := generateGCTableName(state, "", tm)
assert.NoError(t, err)
assert.True(t, IsGCTableName(tableName))
}
Expand Down Expand Up @@ -77,9 +77,10 @@ func TestAnalyzeGCTableName(t *testing.T) {
},
}
for _, ts := range tt {
isGC, state, tm, err := AnalyzeGCTableName(ts.tableName)
isGC, state, uuid, tm, err := AnalyzeGCTableName(ts.tableName)
assert.NoError(t, err)
assert.True(t, isGC)
assert.True(t, IsGCUUID(uuid))
assert.Equal(t, ts.state, state)
assert.Equal(t, ts.t, tm)
}
Expand Down Expand Up @@ -140,12 +141,14 @@ func TestParseGCLifecycle(t *testing.T) {
},
}
for _, ts := range tt {
states, err := ParseGCLifecycle(ts.lifecycle)
if ts.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, ts.states, states)
}
t.Run(ts.lifecycle, func(*testing.T) {
states, err := ParseGCLifecycle(ts.lifecycle)
if ts.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, ts.states, states)
}
})
}
}
Loading