Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Onlineddl: formalize "immediate operations", respect --postpone-completion strategy flag #11910

Merged
merged 10 commits into from
Dec 11, 2022
44 changes: 40 additions & 4 deletions go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func TestSchemaChange(t *testing.T) {
// ALTER VIEW
t.Run("ALTER VIEW where view exists", func(t *testing.T) {
// The view exists
checkTable(t, viewName, true)
uuid := testOnlineDDLStatementForView(t, alterViewStatement, ddlStrategy, "vtgate", "success_alter")
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
Expand Down Expand Up @@ -627,7 +628,8 @@ func TestSchemaChange(t *testing.T) {
checkMigratedTable(t, tableName, alterHints[0])
testSelectTableMetrics(t)
})
t.Run("postponed revert", func(t *testing.T) {
testPostponedRevert := func(t *testing.T, expectStatuses ...schema.OnlineDDLStatus) {
require.NotEmpty(t, expectStatuses)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
Expand All @@ -636,22 +638,56 @@ func TestSchemaChange(t *testing.T) {
defer wg.Done()
runMultipleConnections(ctx, t)
}()
uuid := testRevertMigration(t, uuids[len(uuids)-1], ddlStrategy+" -postpone-completion")
uuid := testRevertMigration(t, uuids[len(uuids)-1], ddlStrategy+" --postpone-completion")
uuids = append(uuids, uuid)
// Should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, expectStatuses...)
// Issue a complete and wait for successful completion
onlineddl.CheckCompleteMigration(t, &vtParams, shards, uuid, true)
// This part may take a while, because we depend on vreplicatoin polling
// This part may take a while, because we depend on vreplication polling
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 60*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
cancel() // will cause runMultipleConnections() to terminate
wg.Wait()
}
t.Run("postponed revert", func(t *testing.T) {
testPostponedRevert(t, schema.OnlineDDLStatusRunning)
checkMigratedTable(t, tableName, alterHints[1])
testSelectTableMetrics(t)
})

t.Run("postponed revert view", func(t *testing.T) {
t.Run("CREATE VIEW again", func(t *testing.T) {
// The view does not exist
uuid := testOnlineDDLStatementForView(t, createViewStatement, ddlStrategy, "vtgate", "success_create")
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
checkTable(t, viewName, true)
testRevertedUUID(t, uuid, "")
})
t.Run("ALTER VIEW, postpone completion", func(t *testing.T) {
// Technically this test better fits in `onlineddl_scheduler_test.go`, but since we've already laid the grounds here, this is where it landed.
// The view exists
checkTable(t, viewName, true)
uuid := testOnlineDDLStatementForView(t, alterViewStatement, ddlStrategy+" --postpone-completion", "vtgate", "success_create")
uuids = append(uuids, uuid)

onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
// Issue a complete and wait for successful completion
onlineddl.CheckCompleteMigration(t, &vtParams, shards, uuid, true)
// This part may take a while, because we depend on vreplication polling
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, 60*time.Second, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
checkTable(t, viewName, true)
testRevertedUUID(t, uuid, "")
})
// now verify that the revert for ALTER VIEW respects `--postpone-completion`
testPostponedRevert(t, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
checkTable(t, viewName, true)
})

// INSTANT DDL
t.Run("INSTANT DDL: add column", func(t *testing.T) {
uuid := testOnlineDDLStatementForTable(t, "alter table stress_test add column i_instant int not null default 0", ddlStrategy+" --prefer-instant-ddl", "vtgate", "i_instant")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ var (
shards []cluster.Shard
vtParams mysql.ConnParams

normalWaitTime = 20 * time.Second
extendedWaitTime = 60 * time.Second
normalWaitTime = 20 * time.Second
extendedWaitTime = 60 * time.Second
ensureStateNotChangedTime = 5 * time.Second

hostname = "localhost"
keyspaceName = "ks"
Expand Down Expand Up @@ -79,6 +80,9 @@ var (
trivialAlterT2Statement = `
ALTER TABLE t2_test ENGINE=InnoDB;
`
instantAlterT1Statement = `
ALTER TABLE t1_test ADD COLUMN i0 INT NOT NULL DEFAULT 0;
`
dropT1Statement = `
DROP TABLE IF EXISTS t1_test
`
Expand Down Expand Up @@ -161,6 +165,10 @@ func TestSchemaChange(t *testing.T) {
shards = clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))

mysqlVersion := onlineddl.GetMySQLVersion(t, clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet())
require.NotEmpty(t, mysqlVersion)
_, capableOf, _ := mysql.GetFlavor(mysqlVersion, nil)

var t1uuid string
var t2uuid string

Expand Down Expand Up @@ -313,7 +321,7 @@ func TestSchemaChange(t *testing.T) {
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
// now that t1 is running, let's unblock t2. We expect it to remain queued.
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t2uuid, true)
time.Sleep(5 * time.Second)
time.Sleep(ensureStateNotChangedTime)
// t1 should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
// non-concurrent -- should be queued!
Expand Down Expand Up @@ -345,7 +353,7 @@ func TestSchemaChange(t *testing.T) {
t.Run("expect both running", func(t *testing.T) {
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusRunning)
time.Sleep(5 * time.Second)
time.Sleep(ensureStateNotChangedTime)
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning)
Expand Down Expand Up @@ -384,7 +392,7 @@ func TestSchemaChange(t *testing.T) {
// since all migrations are throttled, t1 migration is not ready_to_complete, hence
// t2 should not be running
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, normalWaitTime, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
time.Sleep(5 * time.Second)
time.Sleep(ensureStateNotChangedTime)
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
Expand All @@ -393,7 +401,7 @@ func TestSchemaChange(t *testing.T) {
onlineddl.UnthrottleAllMigrations(t, &vtParams)
// t1 should now be ready_to_complete, hence t2 should start running
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t2uuid, extendedWaitTime, schema.OnlineDDLStatusRunning)
time.Sleep(5 * time.Second)
time.Sleep(ensureStateNotChangedTime)
// both should be still running!
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t2uuid, schema.OnlineDDLStatusRunning)
Expand Down Expand Up @@ -566,7 +574,7 @@ func TestSchemaChange(t *testing.T) {
})
drop1uuid := testOnlineDDLStatement(t, dropT1Statement, ddlStrategy+" -allow-concurrent", "vtgate", "", "", true) // skip wait
t.Run("t1drop blocked", func(t *testing.T) {
time.Sleep(5 * time.Second)
time.Sleep(ensureStateNotChangedTime)
// drop1 migration should block. It can run concurrently to t1, but conflicts on table name
onlineddl.CheckMigrationStatus(t, &vtParams, shards, drop1uuid, schema.OnlineDDLStatusReady)
})
Expand Down Expand Up @@ -639,6 +647,28 @@ func TestSchemaChange(t *testing.T) {
}
})
})
// INSTANT DDL
instantDDLCapable, err := capableOf(mysql.InstantAddLastColumnFlavorCapability)
require.NoError(t, err)
if instantDDLCapable {
t.Run("INSTANT DDL: postpone-completion", func(t *testing.T) {
t1uuid := testOnlineDDLStatement(t, instantAlterT1Statement, ddlStrategy+" --prefer-instant-ddl --postpone-completion", "vtgate", "", "", true)

t.Run("expect t1 queued", func(t *testing.T) {
// we want to validate that the migration remains queued even after some time passes. It must not move beyond 'queued'
time.Sleep(ensureStateNotChangedTime)
onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusQueued, schema.OnlineDDLStatusReady)
})
t.Run("complete t1", func(t *testing.T) {
// Issue a complete and wait for successful completion
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
})
})
}
}

// testOnlineDDLStatement runs an online DDL, ALTER statement
Expand Down
Loading