Skip to content

Commit

Permalink
resolve conflict (vitessio#1603)
Browse files Browse the repository at this point in the history
  • Loading branch information
shlomi-noach authored Feb 23, 2023
1 parent 3984ace commit 2a249de
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 108 deletions.
16 changes: 16 additions & 0 deletions go/mysql/sql_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package mysql

import (
"fmt"
"testing"

"vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -151,6 +152,21 @@ func TestNewSQLErrorFromError(t *testing.T) {
num: ERNoDb,
ss: SSNoDB,
},
{
err: fmt.Errorf("just some random text here"),
num: ERUnknownError,
ss: SSUnknownSQLState,
},
{
err: fmt.Errorf("task error: Column 'val' cannot be null (errno 1048) (sqlstate 23000) during query: insert into _edf4846d_ab65_11ed_abb1_0a43f95f28a3_20230213061619_vrepl(id,val,ts) values (1,2,'2023-02-13 04:46:16'), (2,3,'2023-02-13 04:46:16'), (3,null,'2023-02-13 04:46:16')"),
num: ERBadNullError,
ss: SSConstraintViolation,
},
{
err: vterrors.Wrapf(fmt.Errorf("Column 'val' cannot be null (errno 1048) (sqlstate 23000) during query: insert into _edf4846d_ab65_11ed_abb1_0a43f95f28a3_20230213061619_vrepl(id,val,ts) values (1,2,'2023-02-13 04:46:16'), (2,3,'2023-02-13 04:46:16'), (3,null,'2023-02-13 04:46:16')"), "task error: %d", 17),
num: ERBadNullError,
ss: SSConstraintViolation,
},
}

for _, tc := range tCases {
Expand Down
144 changes: 130 additions & 14 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ func TestMain(m *testing.M) {
}

clusterInstance.VtTabletExtraArgs = []string{
"--enable-lag-throttler",
"--throttle_threshold", "1s",
"--throttler-config-via-topo",
"--heartbeat_enable",
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
Expand Down Expand Up @@ -253,6 +252,12 @@ func TestSchemaChange(t *testing.T) {

providedUUID := ""
providedMigrationContext := ""

t.Run("enabling throttler with default threshold", func(t *testing.T) {
_, err := onlineddl.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "", false)
assert.NoError(t, err)
})

testWithInitialSchema(t)
t.Run("alter non_online", func(t *testing.T) {
_ = testOnlineDDLStatement(t, alterTableNormalStatement, string(schema.DDLStrategyDirect), providedUUID, providedMigrationContext, "vtctl", "non_online", "", false)
Expand Down Expand Up @@ -487,7 +492,9 @@ func TestSchemaChange(t *testing.T) {
// reparent shard -80 to replica
// and then reparent it back to original state
// (two pretty much identical tests, the point is to end up with original state)
for currentPrimaryTabletIndex, reparentTabletIndex := range []int{1, 0} {
for _, currentPrimaryTabletIndex := range []int{0, 1} {
currentPrimaryTablet := shards[0].Vttablets[currentPrimaryTabletIndex]
reparentTablet := shards[0].Vttablets[1-currentPrimaryTabletIndex]
t.Run(fmt.Sprintf("PlannedReparentShard via throttling %d/2", (currentPrimaryTabletIndex+1)), func(t *testing.T) {

insertRows(t, 2)
Expand All @@ -498,8 +505,8 @@ func TestSchemaChange(t *testing.T) {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = throttleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName)
defer unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName)
body, err = throttleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
defer unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
Expand All @@ -511,12 +518,19 @@ func TestSchemaChange(t *testing.T) {
}
uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)

t.Run("wait for migration and vreplication to run", func(t *testing.T) {
t.Run("wait for migration to run", func(t *testing.T) {
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
time.Sleep(5 * time.Second) // wait for _vt.vreplication to be created
vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, shards, uuid, normalMigrationWait, "Copying")
})
t.Run("wait for vreplication to run on shard -80", func(t *testing.T) {
vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, currentPrimaryTablet, uuid, normalMigrationWait, "Copying", "Running")
require.Contains(t, []string{"Copying", "Running"}, vreplStatus)
})
t.Run("wait for vreplication to run on shard 80-", func(t *testing.T) {
vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, shards[1].Vttablets[0], uuid, normalMigrationWait, "Copying", "Running")
require.Contains(t, []string{"Copying", "Running"}, vreplStatus)
})
t.Run("check status again", func(t *testing.T) {
// again see that we're still 'running'
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
testRows(t)
Expand All @@ -533,7 +547,7 @@ func TestSchemaChange(t *testing.T) {

switch shard {
case "-80":
require.Equal(t, shards[0].Vttablets[currentPrimaryTabletIndex].Alias, tablet)
require.Equal(t, currentPrimaryTablet.Alias, tablet)
case "80-":
require.Equal(t, shards[1].Vttablets[0].Alias, tablet)
default:
Expand All @@ -543,18 +557,20 @@ func TestSchemaChange(t *testing.T) {
})
t.Run("PRS shard -80", func(t *testing.T) {
// migration has started and is throttled. We now run PRS
err := clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--", "--keyspace_shard", keyspaceName+"/-80", "--new_primary", shards[0].Vttablets[reparentTabletIndex].Alias)
err := clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--", "--keyspace_shard", keyspaceName+"/-80", "--new_primary", reparentTablet.Alias)
require.NoError(t, err, "failed PRS: %v", err)
rs := onlineddl.VtgateExecQuery(t, &vtParams, "show vitess_tablets", "")
onlineddl.PrintQueryResult(os.Stdout, rs)
})
t.Run("unthrottle and expect completion", func(t *testing.T) {
t.Run("unthrottle", func(t *testing.T) {
for i := range shards {
var body string
var err error
switch i {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = unthrottleApp(shards[i].Vttablets[currentPrimaryTabletIndex], onlineDDLThrottlerAppName)
body, err = unthrottleApp(currentPrimaryTablet, onlineDDLThrottlerAppName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
Expand All @@ -563,7 +579,8 @@ func TestSchemaChange(t *testing.T) {
assert.NoError(t, err)
assert.Contains(t, body, onlineDDLThrottlerAppName)
}

})
t.Run("expect completion", func(t *testing.T) {
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, extendedMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
})
Expand All @@ -581,7 +598,7 @@ func TestSchemaChange(t *testing.T) {
switch shard {
case "-80":
// PRS for this tablet, we promoted tablet[1]
require.Equal(t, shards[0].Vttablets[reparentTabletIndex].Alias, tablet)
require.Equal(t, reparentTablet.Alias, tablet)
case "80-":
// No PRS for this tablet
require.Equal(t, shards[1].Vttablets[0].Alias, tablet)
Expand All @@ -596,6 +613,105 @@ func TestSchemaChange(t *testing.T) {
})
})
}

// reparent shard -80 to replica
// and then reparent it back to original state
// (two pretty much identical tests, the point is to end up with original state)
for _, currentPrimaryTabletIndex := range []int{0, 1} {
currentPrimaryTablet := shards[0].Vttablets[currentPrimaryTabletIndex]
reparentTablet := shards[0].Vttablets[1-currentPrimaryTabletIndex]

t.Run(fmt.Sprintf("PlannedReparentShard via postponed %d/2", (currentPrimaryTabletIndex+1)), func(t *testing.T) {

insertRows(t, 2)

uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess --postpone-completion", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)

t.Run("wait for migration to run", func(t *testing.T) {
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
})
t.Run("wait for vreplication to run on shard -80", func(t *testing.T) {
vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, currentPrimaryTablet, uuid, normalMigrationWait, "Copying", "Running")
require.Contains(t, []string{"Copying", "Running"}, vreplStatus)
})
t.Run("wait for vreplication to run on shard 80-", func(t *testing.T) {
vreplStatus := onlineddl.WaitForVReplicationStatus(t, &vtParams, shards[1].Vttablets[0], uuid, normalMigrationWait, "Copying", "Running")
require.Contains(t, []string{"Copying", "Running"}, vreplStatus)
})
t.Run("check status again", func(t *testing.T) {
// again see that we're still 'running'
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
testRows(t)
})

t.Run("Check tablet", func(t *testing.T) {
// onlineddl.Executor marks this migration with its tablet alias
// reminder that onlineddl.Executor runs on the primary tablet.
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
shard := row["shard"].ToString()
tablet := row["tablet"].ToString()

switch shard {
case "-80":
require.Equal(t, currentPrimaryTablet.Alias, tablet)
case "80-":
require.Equal(t, shards[1].Vttablets[0].Alias, tablet)
default:
require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard))
}
}
})
t.Run("PRS shard -80", func(t *testing.T) {
// migration has started and completion is postponed. We now PRS
err := clusterInstance.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--", "--keyspace_shard", keyspaceName+"/-80", "--new_primary", reparentTablet.Alias)
require.NoError(t, err, "failed PRS: %v", err)
rs := onlineddl.VtgateExecQuery(t, &vtParams, "show vitess_tablets", "")
onlineddl.PrintQueryResult(os.Stdout, rs)
})
t.Run("complete and expect completion", func(t *testing.T) {
query := fmt.Sprintf("select * from _vt.vreplication where workflow ='%s'", uuid)
rs, err := reparentTablet.VttabletProcess.QueryTablet(query, "", true)
assert.NoError(t, err)
onlineddl.PrintQueryResult(os.Stdout, rs)

onlineddl.CheckCompleteAllMigrations(t, &vtParams, len(shards))

_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, extendedMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
})

t.Run("Check tablet post PRS", func(t *testing.T) {
// onlineddl.Executor will find that a vrepl migration started in a different tablet.
// it will own the tablet and will update 'tablet' column in _vt.schema_migrations with its own
// (promoted primary) tablet alias.
rs := onlineddl.ReadMigrations(t, &vtParams, uuid)
require.NotNil(t, rs)
for _, row := range rs.Named().Rows {
shard := row["shard"].ToString()
tablet := row["tablet"].ToString()

switch shard {
case "-80":
// PRS for this tablet
require.Equal(t, reparentTablet.Alias, tablet)
case "80-":
// No PRS for this tablet
require.Equal(t, shards[1].Vttablets[0].Alias, tablet)
default:
require.NoError(t, fmt.Errorf("unexpected shard name: %s", shard))
}
}

onlineddl.CheckRetryPartialMigration(t, &vtParams, uuid, 1)
// Now it should complete on the failed shard
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, extendedMigrationWait, schema.OnlineDDLStatusComplete)
})
})
}

t.Run("Online DROP, vtctl", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, onlineDDLDropTableStatement, "online", providedUUID, providedMigrationContext, "vtctl", "", "", false)
t.Run("test ready to complete", func(t *testing.T) {
Expand Down
30 changes: 30 additions & 0 deletions go/test/endtoend/onlineddl/vtctlutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package onlineddl

import (
"fmt"
"testing"

"vitess.io/vitess/go/test/endtoend/cluster"
Expand All @@ -31,3 +32,32 @@ func CheckCancelAllMigrationsViaVtctl(t *testing.T, vtctlclient *cluster.VtctlCl
_, err := vtctlclient.ApplySchemaWithOutput(keyspace, cancelQuery, cluster.VtctlClientParams{SkipPreflight: true})
assert.NoError(t, err)
}

// UpdateThrottlerTopoConfig runs vtctlclient UpdateThrottlerConfig
func UpdateThrottlerTopoConfig(clusterInstance *cluster.LocalProcessCluster, enable bool, disable bool, threshold float64, metricsQuery string, viaVtctldClient bool) (result string, err error) {
args := []string{}
if !viaVtctldClient {
args = append(args, "--")
}
args = append(args, "UpdateThrottlerConfig")
if enable {
args = append(args, "--enable")
}
if disable {
args = append(args, "--disable")
}
if threshold > 0 {
args = append(args, "--threshold", fmt.Sprintf("%f", threshold))
}
if metricsQuery != "" {
args = append(args, "--custom-query", metricsQuery)
args = append(args, "--check-as-check-self")
} else {
args = append(args, "--check-as-check-shard")
}
args = append(args, clusterInstance.Keyspaces[0].Name)
if viaVtctldClient {
return clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args...)
}
return clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput(args...)
}
24 changes: 9 additions & 15 deletions go/test/endtoend/onlineddl/vttablet_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (
)

// WaitForVReplicationStatus waits for a vreplication stream to be in one of given states, or timeout
func WaitForVReplicationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...string) (status string) {
func WaitForVReplicationStatus(t *testing.T, vtParams *mysql.ConnParams, tablet *cluster.Vttablet, uuid string, timeout time.Duration, expectStatuses ...string) (status string) {

query, err := sqlparser.ParseAndBind("select workflow, state from _vt.vreplication where workflow=%a",
query, err := sqlparser.ParseAndBind("select state from _vt.vreplication where workflow=%a",
sqltypes.StringBindVariable(uuid),
)
require.NoError(t, err)
Expand All @@ -45,22 +45,16 @@ func WaitForVReplicationStatus(t *testing.T, vtParams *mysql.ConnParams, shards
startTime := time.Now()
lastKnownStatus := ""
for time.Since(startTime) < timeout {
countMatchedShards := 0
r, err := tablet.VttabletProcess.QueryTablet(query, "", true)
require.NoError(t, err)

for _, shard := range shards {
r, err := shard.Vttablets[0].VttabletProcess.QueryTablet(query, "", false)
require.NoError(t, err)

for _, row := range r.Named().Rows {
lastKnownStatus = row["state"].ToString()
if row["workflow"].ToString() == uuid && statusesMap[lastKnownStatus] {
countMatchedShards++
}
if row := r.Named().Row(); row != nil {
lastKnownStatus, err = row.ToString("state")
assert.NoError(t, err)
if statusesMap[lastKnownStatus] {
return lastKnownStatus
}
}
if countMatchedShards == len(shards) {
return lastKnownStatus
}
time.Sleep(1 * time.Second)
}
return lastKnownStatus
Expand Down
Loading

0 comments on commit 2a249de

Please sign in to comment.