Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reparent_journal: add backwards compatible alter statement #9439

Merged
merged 4 commits into from
Dec 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions go/cmd/vtbackup/vtbackup.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if err := mysqld.ExecuteSuperQueryList(ctx, cmds); err != nil {
return fmt.Errorf("can't initialize database: %v", err)
}

// Execute Alter commands on reparent_journal and ignore errors
cmds = mysqlctl.AlterReparentJournal()
_ = mysqld.ExecuteSuperQueryList(ctx, cmds)

backupParams.BackupTime = time.Now()
// Now we're ready to take the backup.
if err := mysqlctl.Backup(ctx, backupParams); err != nil {
Expand Down
12 changes: 6 additions & 6 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,19 @@ func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr Tabl
// Start starts the topology watcher
func (tw *TopologyWatcher) Start() {
tw.wg.Add(1)
go func() {
defer tw.wg.Done()
ticker := time.NewTicker(tw.refreshInterval)
go func(t *TopologyWatcher) {
defer t.wg.Done()
ticker := time.NewTicker(t.refreshInterval)
defer ticker.Stop()
for {
tw.loadTablets()
t.loadTablets()
select {
case <-tw.ctx.Done():
case <-t.ctx.Done():
return
case <-ticker.C:
}
}
}()
}(tw)
}

// Stop stops the watcher. It does not clean up the tablets added to LegacyTabletRecorder.
Expand Down
14 changes: 12 additions & 2 deletions go/vt/mysqlctl/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package mysqlctl

/*
This file contains the reparenting methods for mysqlctl.

TODO(alainjobart) Once refactoring is done, remove unused code paths.
*/

import (
Expand Down Expand Up @@ -51,6 +49,18 @@ func CreateReparentJournal() []string {
ENGINE=InnoDB`, mysql.MaximumPositionSize)}
}

// AlterReparentJournal returns the commands to execute to change
// column master_alias -> primary_alias or the other way
// In 13.0.0 we introduce renaming of primary_alias -> master_alias.
// This is to support in-place downgrade from a later version.
// In 14.0.0 we will replace this with renaming of master_alias -> primary_alias.
// This is to support in-place upgrades from 13.0.x to 14.0.x
func AlterReparentJournal() []string {
return []string{
"`ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL`",
}
}

// PopulateReparentJournal returns the SQL command to use to populate
// the _vt.reparent_journal table, as well as the time_created_ns
// value used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ func TestInitShardPrimary(t *testing.T) {
"FAKE RESET ALL REPLICATION",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}

Expand Down Expand Up @@ -111,8 +113,10 @@ func TestInitShardPrimaryNoFormerPrimary(t *testing.T) {
"FAKE RESET ALL REPLICATION",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}

Expand Down
79 changes: 40 additions & 39 deletions go/vt/vtgate/executor_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"testing"
"time"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/vt/discovery"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/vtgate/engine"

querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -32,10 +36,7 @@ import (

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/cache"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/discovery"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
_ "vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/sandboxconn"
)
Expand All @@ -54,6 +55,42 @@ func TestStreamSQLUnsharded(t *testing.T) {
}
}

func TestStreamSQLSharded(t *testing.T) {
// Special setup: Don't use createLegacyExecutorEnv.
cell := "aa"
hc := discovery.NewFakeLegacyHealthCheck()
s := createSandbox("TestExecutor")
s.VSchema = executorVSchema
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
serv := newSandboxForCells([]string{cell})
resolver := newTestLegacyResolver(hc, serv, cell)
shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false)

sql := "stream * from sharded_user_msgs"
result, err := executorStreamMessages(executor, sql)
require.NoError(t, err)
wantResult := &sqltypes.Result{
Fields: sandboxconn.SingleRowResult.Fields,
Rows: [][]sqltypes.Value{
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
},
}
if !result.Equal(wantResult) {
t.Errorf("result: %+v, want %+v", result, wantResult)
}
}

func TestVStreamSQLUnsharded(t *testing.T) {
executor, _, _, sbcLookup := createExecutorEnv()
logChan := QueryLogger.Subscribe("Test")
Expand Down Expand Up @@ -150,42 +187,6 @@ func TestVStreamSQLUnsharded(t *testing.T) {
require.Equal(t, expectedDeletes, numDeletes)
}

func TestStreamSQLSharded(t *testing.T) {
// Special setup: Don't use createLegacyExecutorEnv.
cell := "aa"
hc := discovery.NewFakeLegacyHealthCheck()
s := createSandbox("TestExecutor")
s.VSchema = executorVSchema
getSandbox(KsTestUnsharded).VSchema = unshardedVSchema
serv := new(sandboxTopo)
resolver := newTestLegacyResolver(hc, serv, cell)
shards := []string{"-20", "20-40", "40-60", "60-80", "80-a0", "a0-c0", "c0-e0", "e0-"}
for _, shard := range shards {
_ = hc.AddTestTablet(cell, shard, 1, "TestExecutor", shard, topodatapb.TabletType_PRIMARY, true, 1, nil)
}
executor := NewExecutor(context.Background(), serv, cell, resolver, false, false, testBufferSize, cache.DefaultConfig, nil, false)

sql := "stream * from sharded_user_msgs"
result, err := executorStreamMessages(executor, sql)
require.NoError(t, err)
wantResult := &sqltypes.Result{
Fields: sandboxconn.SingleRowResult.Fields,
Rows: [][]sqltypes.Value{
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
sandboxconn.StreamRowResult.Rows[0],
},
}
if !result.Equal(wantResult) {
t.Errorf("result: %+v, want %+v", result, wantResult)
}
}

func executorStreamMessages(executor *Executor, sql string) (qr *sqltypes.Result, err error) {
results := make(chan *sqltypes.Result, 100)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
Expand Down
14 changes: 13 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ func (tm *TabletManager) InitPrimary(ctx context.Context) (string, error) {
return "", err
}

// Execute ALTER statement on reparent_journal table and ignore errors
cmds = mysqlctl.AlterReparentJournal()
_ = tm.MysqlDaemon.ExecuteSuperQueryList(ctx, cmds)

// get the current replication position
pos, err := tm.MysqlDaemon.PrimaryPosition()
if err != nil {
Expand Down Expand Up @@ -290,7 +294,15 @@ func (tm *TabletManager) PopulateReparentJournal(ctx context.Context, timeCreate
return err
}
cmds := mysqlctl.CreateReparentJournal()
cmds = append(cmds, mysqlctl.PopulateReparentJournal(timeCreatedNS, actionName, topoproto.TabletAliasString(primaryAlias), pos))
if err := tm.MysqlDaemon.ExecuteSuperQueryList(ctx, cmds); err != nil {
return err
}

// Execute ALTER statement on reparent_journal table and ignore errors
cmds = mysqlctl.AlterReparentJournal()
_ = tm.MysqlDaemon.ExecuteSuperQueryList(ctx, cmds)

cmds = []string{mysqlctl.PopulateReparentJournal(timeCreatedNS, actionName, topoproto.TabletAliasString(primaryAlias), pos)}

return tm.MysqlDaemon.ExecuteSuperQueryList(ctx, cmds)
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/testlib/emergency_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestEmergencyReparentShard(t *testing.T) {
"STOP SLAVE IO_THREAD",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
newPrimary.FakeMysqlDaemon.PromoteResult = mysql.Position{
Expand Down Expand Up @@ -229,6 +230,7 @@ func TestEmergencyReparentShardPrimaryElectNotBest(t *testing.T) {
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
newPrimary.StartActionLoop(t, wr)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/wrangler/testlib/planned_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestPlannedReparentShardNoPrimaryProvided(t *testing.T) {
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
newPrimary.StartActionLoop(t, wr)
Expand Down Expand Up @@ -192,6 +193,7 @@ func TestPlannedReparentShardNoError(t *testing.T) {
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
newPrimary.StartActionLoop(t, wr)
Expand Down Expand Up @@ -303,8 +305,10 @@ func TestPlannedReparentInitialization(t *testing.T) {
newPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
newPrimary.StartActionLoop(t, wr)
Expand Down Expand Up @@ -402,6 +406,7 @@ func TestPlannedReparentShardWaitForPositionFail(t *testing.T) {
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
newPrimary.StartActionLoop(t, wr)
Expand Down Expand Up @@ -503,6 +508,7 @@ func TestPlannedReparentShardWaitForPositionTimeout(t *testing.T) {
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
newPrimary.StartActionLoop(t, wr)
Expand Down Expand Up @@ -587,6 +593,7 @@ func TestPlannedReparentShardRelayLogError(t *testing.T) {
primary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
primary.StartActionLoop(t, wr)
Expand Down Expand Up @@ -662,6 +669,7 @@ func TestPlannedReparentShardRelayLogErrorStartReplication(t *testing.T) {
primary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
primary.StartActionLoop(t, wr)
Expand Down Expand Up @@ -753,6 +761,7 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) {
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
newPrimary.StartActionLoop(t, wr)
Expand Down Expand Up @@ -818,6 +827,7 @@ func TestPlannedReparentShardPromoteReplicaFail(t *testing.T) {
"START SLAVE",
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
Expand Down Expand Up @@ -873,6 +883,7 @@ func TestPlannedReparentShardSamePrimary(t *testing.T) {
oldPrimary.FakeMysqlDaemon.ExpectedExecuteSuperQueryList = []string{
"CREATE DATABASE IF NOT EXISTS _vt",
"SUBCREATE TABLE IF NOT EXISTS _vt.reparent_journal",
"ALTER TABLE _vt.reparent_journal CHANGE COLUMN primary_alias master_alias VARBINARY(32) NOT NULL",
"SUBINSERT INTO _vt.reparent_journal (time_created_ns, action_name, master_alias, replication_position) VALUES",
}
oldPrimary.StartActionLoop(t, wr)
Expand Down