Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for ALTER VITESS_MIGRATION statements #7663

Merged
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
dfb2a2f
parser support for ALTER VITESS_MIGRATION
shlomi-noach Mar 10, 2021
f956f72
ALTER VITESS_MIGRATION plan is Send
shlomi-noach Mar 10, 2021
365cc5d
rename migration plan file
shlomi-noach Mar 10, 2021
fde7f83
Format; test parsing of ALTER VITESS_MIGRATION statements
shlomi-noach Mar 10, 2021
0e9566c
Support for ALTER VITESS_MIGRATION plan in vttablet; implementation f…
shlomi-noach Mar 10, 2021
785d1ac
using ALTER VITESS_MIGRATION ... RETRY in endtoend/onlineddl/vrepl
shlomi-noach Mar 10, 2021
ffb2972
using ALTER VITESS_MIGRATION ... RETRY in endtoend/onlineddl/ghost
shlomi-noach Mar 10, 2021
90ba772
CancelMigration and CancelAllMigrations are public functions
shlomi-noach Mar 10, 2021
c77e01d
using ALTER VITESS_MIGRATION ... CANCEL in endtoend/onlineddl/ghost
shlomi-noach Mar 10, 2021
9a55059
using ALTER VITESS_MIGRATION ... CANCEL in endtoend/onlineddl/vrepl
shlomi-noach Mar 11, 2021
d45b7cb
using ALTER VITESS_MIGRATION ... CANCEL ALL in endtoend/onlineddl/
shlomi-noach Mar 11, 2021
9671f3e
using ALTER VITESS_MIGRATION ... CANCEL ALL in endtoend/onlineddl/
shlomi-noach Mar 11, 2021
5be1535
using ALTER VITESS_MIGRATION ... CANCEL ALL in endtoend/onlineddl/
shlomi-noach Mar 11, 2021
b86b21e
refactor: CheckMigrationStatus in endtoend/onlineddl
shlomi-noach Mar 11, 2021
384088b
refactor: VtgateExecDDL in endtoend/onlineddl
shlomi-noach Mar 11, 2021
b127d93
fix expect number of migrations to cancel
shlomi-noach Mar 11, 2021
9aa7fd0
merge master, resolve conflicts
shlomi-noach Mar 11, 2021
542c415
fix test assertion
shlomi-noach Mar 11, 2021
b6fb4ee
make sizegen
shlomi-noach Mar 11, 2021
72211cd
merge master, resolve conflict
shlomi-noach Mar 14, 2021
1e54273
sizegen
shlomi-noach Mar 14, 2021
5b882d4
copyright year
shlomi-noach Mar 14, 2021
9561fce
merge master, resolve conflicts
shlomi-noach Mar 15, 2021
ca6eb9a
Merge branch 'master' into online-ddl-sql-interface-alter
shlomi-noach Mar 15, 2021
89cb0bf
Merge branch 'master' into online-ddl-sql-interface-alter
shlomi-noach Mar 16, 2021
f69f095
endtoend: simplify TestQueryPlanCache
vmg Mar 16, 2021
1000527
merge master, resolve conflicts
shlomi-noach Mar 17, 2021
4db306a
merge master, resolve conflicts
shlomi-noach Mar 17, 2021
3b5f9cd
merge master, resolve conflicts
shlomi-noach Mar 17, 2021
4b64dbf
fixes per review
shlomi-noach Mar 17, 2021
5a66255
Merge remote-tracking branch 'upstream/master' into online-ddl-sql-in…
systay Mar 18, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 32 additions & 129 deletions go/test/endtoend/onlineddl/ghost/onlineddl_ghost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,16 @@ limitations under the License.
package ghost

import (
"context"
"flag"
"fmt"
"os"
"path"
"regexp"
"strings"
"sync"
"testing"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/test/endtoend/cluster"
Expand Down Expand Up @@ -133,10 +130,6 @@ var (
`
)

func fullWordRegexp(searchWord string) *regexp.Regexp {
return regexp.MustCompile(`.*?\b` + searchWord + `\b`)
}

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
Expand Down Expand Up @@ -205,41 +198,42 @@ func TestMain(m *testing.M) {

func TestSchemaChange(t *testing.T) {
defer cluster.PanicHandler(t)
assert.Equal(t, 2, len(clusterInstance.Keyspaces[0].Shards))
shards := clusterInstance.Keyspaces[0].Shards
assert.Equal(t, 2, len(shards))
testWithInitialSchema(t)
t.Run("create non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), "vtctl", "non_online")
})
t.Run("successful online alter, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableSuccessfulStatement, "gh-ost", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("successful online alter, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "gh-ost", "vtctl", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("throttled migration", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableThrottlingStatement, "gh-ost --max-load=Threads_running=1", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusRunning)
checkCancelMigration(t, uuid, true)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, true)
time.Sleep(2 * time.Second)
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
})
t.Run("failed migration", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, alterTableFailedStatement, "gh-ost", "vtgate", "ghost_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, true)
// migration will fail again
})
t.Run("cancel all migrations: nothing to cancel", func(t *testing.T) {
// no migrations pending at this time
time.Sleep(10 * time.Second)
checkCancelAllMigrations(t, 0)
onlineddl.CheckCancelAllMigrations(t, &vtParams, 0)
})
t.Run("cancel all migrations: some migrations to cancel", func(t *testing.T) {
// spawn n migrations; cancel them via cancel-all
Expand All @@ -253,41 +247,41 @@ func TestSchemaChange(t *testing.T) {
}()
}
wg.Wait()
checkCancelAllMigrations(t, count)
onlineddl.CheckCancelAllMigrations(t, &vtParams, len(shards)*count)
})
t.Run("Online DROP, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtctl", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("Online CREATE, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLCreateTableStatement, "gh-ost", "vtctl", "online_ddl_create_col")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
})
t.Run("Online DROP TABLE IF EXISTS, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "gh-ost", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
// this table existed
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 1)
})
t.Run("Online DROP TABLE IF EXISTS for nonexistent table, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableIfExistsStatement, "gh-ost", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusComplete)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, false)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, false)
// this table did not exist
checkTables(t, schema.OnlineDDLToGCUUID(uuid), 0)
})
t.Run("Online DROP TABLE for nonexistent table, expect error, vtgate", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "gh-ost", "vtgate", "")
checkRecentMigrations(t, uuid, schema.OnlineDDLStatusFailed)
checkCancelMigration(t, uuid, false)
checkRetryMigration(t, uuid, true)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusFailed)
onlineddl.CheckCancelMigration(t, &vtParams, shards, uuid, false)
onlineddl.CheckRetryMigration(t, &vtParams, shards, uuid, true)
})
}

Expand All @@ -309,7 +303,7 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
tableName := fmt.Sprintf("vt_onlineddl_test_%02d", 3)
sqlQuery := fmt.Sprintf(alterStatement, tableName)
if executeStrategy == "vtgate" {
row := vtgateExec(t, ddlStrategy, sqlQuery, "").Named().Row()
row := onlineddl.VtgateExecDDL(t, &vtParams, ddlStrategy, sqlQuery, "").Named().Row()
if row != nil {
uuid = row.AsString("uuid", "")
}
Expand Down Expand Up @@ -349,75 +343,6 @@ func checkTablesCount(t *testing.T, tablet *cluster.Vttablet, showTableName stri
assert.Equal(t, expectCount, len(queryResult.Rows))
}

// checkRecentMigrations checks 'OnlineDDL <keyspace> show recent' output. Example to such output:
// +------------------+-------+--------------+----------------------+--------------------------------------+----------+---------------------+---------------------+------------------+
// | Tablet | shard | mysql_schema | mysql_table | migration_uuid | strategy | started_timestamp | completed_timestamp | migration_status |
// +------------------+-------+--------------+----------------------+--------------------------------------+----------+---------------------+---------------------+------------------+
// | zone1-0000003880 | 0 | vt_ks | vt_onlineddl_test_03 | a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a | gh-ost | 2020-09-01 17:50:40 | 2020-09-01 17:50:41 | complete |
// | zone1-0000003884 | 1 | vt_ks | vt_onlineddl_test_03 | a0638f6b_ec7b_11ea_9bf8_000d3a9b8a9a | gh-ost | 2020-09-01 17:50:40 | 2020-09-01 17:50:41 | complete |
// +------------------+-------+--------------+----------------------+--------------------------------------+----------+---------------------+---------------------+------------------+

func checkRecentMigrations(t *testing.T, uuid string, expectStatus schema.OnlineDDLStatus) {
showQuery := fmt.Sprintf("show vitess_migrations like '%s'", uuid)
r := onlineddl.VtgateExecQuery(t, &vtParams, showQuery, "")
fmt.Printf("# output for `%s`:\n", showQuery)
onlineddl.PrintQueryResult(os.Stdout, r)

count := 0
for _, row := range r.Named().Rows {
if row["migration_uuid"].ToString() == uuid && row["migration_status"].ToString() == string(expectStatus) {
count++
}
}
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), count)
}

// checkCancelMigration attempts to cancel a migration, and expects rejection
func checkCancelMigration(t *testing.T, uuid string, expectCancelPossible bool) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLCancelMigration(keyspaceName, uuid)
fmt.Println("# 'vtctlclient OnlineDDL cancel <uuid>' output (for debug purposes):")
fmt.Println(result)
assert.NoError(t, err)

var r *regexp.Regexp
if expectCancelPossible {
r = fullWordRegexp("1")
} else {
r = fullWordRegexp("0")
}
m := r.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}

// checkCancelAllMigrations all pending migrations
func checkCancelAllMigrations(t *testing.T, expectCount int) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLCancelAllMigrations(keyspaceName)
fmt.Println("# 'vtctlclient OnlineDDL cancel-all' output (for debug purposes):")
fmt.Println(result)
assert.NoError(t, err)

r := fullWordRegexp(fmt.Sprintf("%d", expectCount))
m := r.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}

// checkRetryMigration attempts to retry a migration, and expects rejection
func checkRetryMigration(t *testing.T, uuid string, expectRetryPossible bool) {
result, err := clusterInstance.VtctlclientProcess.OnlineDDLRetryMigration(keyspaceName, uuid)
fmt.Println("# 'vtctlclient OnlineDDL retry <uuid>' output (for debug purposes):")
fmt.Println(result)
assert.NoError(t, err)

var r *regexp.Regexp
if expectRetryPossible {
r = fullWordRegexp("1")
} else {
r = fullWordRegexp("0")
}
m := r.FindAllString(result, -1)
assert.Equal(t, len(clusterInstance.Keyspaces[0].Shards), len(m))
}

// checkMigratedTables checks the CREATE STATEMENT of a table after migration
func checkMigratedTable(t *testing.T, tableName, expectColumn string) {
for i := range clusterInstance.Keyspaces[0].Shards {
Expand All @@ -436,25 +361,3 @@ func getCreateTableStatement(t *testing.T, tablet *cluster.Vttablet, tableName s
statement = queryResult.Rows[0][1].ToString()
return statement
}

func vtgateExec(t *testing.T, ddlStrategy string, query string, expectError string) *sqltypes.Result {
t.Helper()

ctx := context.Background()
conn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer conn.Close()

setSession := fmt.Sprintf("set @@ddl_strategy='%s'", ddlStrategy)
_, err = conn.ExecuteFetch(setSession, 1000, true)
assert.NoError(t, err)

qr, err := conn.ExecuteFetch(query, 1000, true)
if expectError == "" {
require.NoError(t, err)
} else {
require.Error(t, err, "error should not be nil")
assert.Contains(t, err.Error(), expectError, "Unexpected error")
}
return qr
}
Loading