Skip to content

Commit

Permalink
Merge pull request vitessio#9481 from planetscale/switch_writes_lock_…
Browse files Browse the repository at this point in the history
…table

Use LOCK TABLES during SwitchWrites to prevent lost writes
  • Loading branch information
harshit-gangal authored and aliulis committed Feb 23, 2022
1 parent 432b9c8 commit 0c343a5
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 11 deletions.
24 changes: 23 additions & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
vtdataroot string
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVtctldArgs = []string{"-remote_operation_timeout", "600s", "-topo_etcd_lease_ttl", "120"}
)

// ClusterConfig defines the parameters like ports, tmpDir, tablet types which uniquely define a vitess cluster
Expand Down Expand Up @@ -164,7 +165,7 @@ func NewVitessCluster(t *testing.T, name string, cellNames []string, clusterConf
vc.Vtctld = vtctld
require.NotNil(t, vc.Vtctld)
// use first cell as `-cell`
vc.Vtctld.Setup(cellNames[0])
vc.Vtctld.Setup(cellNames[0], extraVtctldArgs...)

vc.Vtctl = cluster.VtctlProcessInstance(vc.ClusterConfig.topoPort, vc.ClusterConfig.hostname)
require.NotNil(t, vc.Vtctl)
Expand Down Expand Up @@ -476,3 +477,24 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
}
return tablets
}

func (vc *VitessCluster) startQuery(t *testing.T, query string) (func(t *testing.T), func(t *testing.T)) {
conn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
_, err := conn.ExecuteFetch("begin", 1000, false)
require.NoError(t, err)
_, err = conn.ExecuteFetch(query, 1000, false)
require.NoError(t, err)

commit := func(t *testing.T) {
_, err = conn.ExecuteFetch("commit", 1000, false)
log.Infof("startQuery:commit:err: %+v", err)
conn.Close()
log.Infof("startQuery:after closing connection")
}
rollback := func(t *testing.T) {
defer conn.Close()
_, err = conn.ExecuteFetch("rollback", 1000, false)
log.Infof("startQuery:rollback:err: %+v", err)
}
return commit, rollback
}
4 changes: 3 additions & 1 deletion go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func execVtgateQuery(t *testing.T, conn *mysql.Conn, database string, query stri
if strings.TrimSpace(query) == "" {
return nil
}
execQuery(t, conn, "use `"+database+"`;")
if database != "" {
execQuery(t, conn, "use `"+database+"`;")
}
execQuery(t, conn, "begin")
qr := execQuery(t, conn, query)
execQuery(t, conn, "commit")
Expand Down
38 changes: 29 additions & 9 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ var (
targetThrottlerAppName = "vreplication"
)

// for some tests we keep an open transaction during a SwitchWrites and commit it afterwards, to reproduce https://github.com/vitessio/vitess/issues/9400
// we also then delete the extra row (if) added so that the row counts for the future count comparisons stay the same
const openTxQuery = "insert into customer(cid, name, typ, sport, meta) values(4, 'openTxQuery',1,'football,baseball','{}');"
const deleteOpenTxQuery = "delete from customer where name = 'openTxQuery'"

func init() {
defaultRdonly = 0
defaultReplicas = 1
Expand Down Expand Up @@ -108,7 +113,8 @@ func TestBasicVreplicationWorkflow(t *testing.T) {
insertInitialData(t)
materializeRollup(t)

shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName)
shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName, false)

// the tenant table was to test a specific case with binary sharding keys. Drop it now so that we don't
// have to update the rest of the tests
execVtgateQuery(t, vtgateConn, "customer", "drop table tenant")
Expand Down Expand Up @@ -138,7 +144,7 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
allCellNames = "zone1,zone2"

vc = NewVitessCluster(t, "TestBasicVreplicationWorkflow", cells, mainClusterConfig)
vc = NewVitessCluster(t, "TestMultiCellVreplicationWorkflow", cells, mainClusterConfig)
require.NotNil(t, vc)
defaultCellName := "zone1"
defaultCell = vc.Cells[defaultCellName]
Expand All @@ -158,7 +164,7 @@ func TestMultiCellVreplicationWorkflow(t *testing.T) {
defer vtgateConn.Close()
verifyClusterHealth(t, vc)
insertInitialData(t)
shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name)
shardCustomer(t, true, []*Cell{cell1, cell2}, cell2.Name, true)
}

func TestCellAliasVreplicationWorkflow(t *testing.T) {
Expand All @@ -167,7 +173,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
defer func() {
mainClusterConfig.vreplicationCompressGTID = false
}()
vc = NewVitessCluster(t, "TestBasicVreplicationWorkflow", cells, mainClusterConfig)
vc = NewVitessCluster(t, "TestCellAliasVreplicationWorkflow", cells, mainClusterConfig)
require.NotNil(t, vc)
allCellNames = "zone1,zone2"
defaultCellName := "zone1"
Expand All @@ -192,7 +198,7 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
defer vtgateConn.Close()
verifyClusterHealth(t, vc)
insertInitialData(t)
shardCustomer(t, true, []*Cell{cell1, cell2}, "alias")
shardCustomer(t, true, []*Cell{cell1, cell2}, "alias", false)
}

func insertInitialData(t *testing.T) {
Expand Down Expand Up @@ -239,7 +245,7 @@ func insertMoreProductsForTargetThrottler(t *testing.T) {
execVtgateQuery(t, vtgateConn, "product", sql)
}

func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string) {
func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string, withOpenTx bool) {
t.Run("shardCustomer", func(t *testing.T) {
workflow := "p2c"
sourceKs := "product"
Expand All @@ -255,12 +261,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
t.Fatal(err)
}

tables := "customer,tenant"
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)

// Assume we are operating on first cell
defaultCell := cells[0]
custKs := vc.Cells[defaultCell.Name].Keyspaces["customer"]

tables := "customer,tenant"
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)

customerTab1 := custKs.Shards["-80"].Tablets["zone1-200"].Vttablet
customerTab2 := custKs.Shards["80-"].Tablets["zone1-300"].Vttablet

Expand All @@ -278,8 +285,21 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchReadsDryRun(t, allCellNames, ksWorkflow, dryRunResultsReadCustomerShard)
switchReads(t, allCellNames, ksWorkflow)
require.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query))

var commit func(t *testing.T)
if withOpenTx {
commit, _ = vc.startQuery(t, openTxQuery)
}
switchWritesDryRun(t, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
switchWrites(t, ksWorkflow, false)
if withOpenTx && commit != nil {
commit(t)
}
vdiff(t, "product.p2c_reverse", "")
if withOpenTx {
execVtgateQuery(t, vtgateConn, "", deleteOpenTxQuery)
}

ksShards := []string{"product/0", "customer/-80", "customer/80-"}
printShardPositions(vc, ksShards)
insertQuery2 := "insert into customer(name, cid) values('tempCustomer2', 100)"
Expand Down
57 changes: 57 additions & 0 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sync"
"time"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/discovery"

"vitess.io/vitess/go/json2"
Expand All @@ -48,6 +49,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand All @@ -63,6 +65,11 @@ type accessType int
const (
allowWrites = accessType(iota)
disallowWrites

// number of LOCK TABLES cycles to perform on the sources during SwitchWrites
lockTablesCycles = 2
// time to wait between LOCK TABLES cycles on the sources during SwitchWrites
lockTablesCycleDelay = time.Duration(100 * time.Millisecond)
)

// trafficSwitcher contains the metadata for switching read and write traffic
Expand Down Expand Up @@ -487,6 +494,7 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa
sw.cancelMigration(ctx, sm)
return 0, sw.logs(), nil
}

ts.wr.Logger().Infof("Stopping streams")
sourceWorkflows, err = sw.stopStreams(ctx, sm)
if err != nil {
Expand All @@ -499,13 +507,30 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa
sw.cancelMigration(ctx, sm)
return 0, nil, err
}

ts.wr.Logger().Infof("Stopping source writes")
if err := sw.stopSourceWrites(ctx); err != nil {
ts.wr.Logger().Errorf("stopSourceWrites failed: %v", err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
}

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles)
// Doing this twice with a pause in-between to catch any writes that may have raced in between
// the tablet's deny list check and the first mysqld side table lock.
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
ts.Logger().Errorf("Failed to execute LOCK TABLES (attempt %d of %d) on sources: %v", cnt, lockTablesCycles, err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
}
// No need to UNLOCK the tables as the connection was closed once the locks were acquired
// and thus the locks released.
time.Sleep(lockTablesCycleDelay)
}
}

ts.wr.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, timeout); err != nil {
ts.wr.Logger().Errorf("waitForCatchup failed: %v", err)
Expand Down Expand Up @@ -1014,6 +1039,38 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
})
}

// executeLockTablesOnSource executes a LOCK TABLES tb1 READ, tbl2 READ,... statement on each
// source shard's primary tablet using a non-pooled connection as the DBA user. The connection
// is closed when the LOCK TABLES statement returns, so we immediately release the LOCKs.
func (ts *trafficSwitcher) executeLockTablesOnSource(ctx context.Context) error {
ts.Logger().Infof("Locking (and then immediately unlocking) the following tables on source keyspace %v: %v", ts.SourceKeyspaceName(), ts.Tables())
if len(ts.Tables()) == 0 {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no tables found in the source keyspace %v associated with the %s workflow", ts.SourceKeyspaceName(), ts.WorkflowName())
}

sb := strings.Builder{}
sb.WriteString("LOCK TABLES ")
for _, tableName := range ts.Tables() {
sb.WriteString(fmt.Sprintf("%s READ,", sqlescape.EscapeID(tableName)))
}
// trim extra trailing comma
lockStmt := sb.String()[:sb.Len()-1]

return ts.ForAllSources(func(source *workflow.MigrationSource) error {
primary := source.GetPrimary()
if primary == nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary found for source shard %s", source.GetShard())
}
tablet := primary.Tablet
_, err := ts.wr.ExecuteFetchAsDba(ctx, tablet.Alias, lockStmt, 1, false, true)
if err != nil {
ts.Logger().Errorf("Error executing %s on source tablet %v: %v", lockStmt, tablet, err)
return err
}
return err
})
}

func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, filteredReplicationWaitTime)
defer cancel()
Expand Down
20 changes: 20 additions & 0 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func TestTableMigrateMainflow(t *testing.T) {
}
cancelMigration()

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, false, false, true, false)
want = "DeadlineExceeded"
if err == nil || !strings.Contains(err.Error(), want) {
Expand Down Expand Up @@ -846,6 +847,8 @@ func TestTableMigrateOneToMany(t *testing.T) {
require.Error(t, err, "Workflow has not completed, cannot DropSources")

tme.dbSourceClients[0].addQueryRE(tsCheckJournals, &sqltypes.Result{}, nil)

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, false, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1049,6 +1052,7 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) {
}
deleteTargetVReplication()

switchWrites(tme)
_, results, err := tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, false, true)
require.NoError(t, err)
require.Empty(t, cmp.Diff(wantdryRunWrites, *results))
Expand Down Expand Up @@ -1134,6 +1138,7 @@ func TestMigrateFailJournal(t *testing.T) {
tme.dbSourceClients[0].addQueryRE("insert into _vt.resharding_journal", nil, errors.New("journaling intentionally failed"))
tme.dbSourceClients[1].addQueryRE("insert into _vt.resharding_journal", nil, errors.New("journaling intentionally failed"))

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false)
want := "journaling intentionally failed"
if err == nil || !strings.Contains(err.Error(), want) {
Expand Down Expand Up @@ -1195,6 +1200,7 @@ func TestTableMigrateJournalExists(t *testing.T) {
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil)
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil)

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1272,6 +1278,7 @@ func TestShardMigrateJournalExists(t *testing.T) {
tme.dbTargetClients[1].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (2)", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil)

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1334,6 +1341,7 @@ func TestTableMigrateCancel(t *testing.T) {
}
cancelMigration()

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true, false, false, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1393,6 +1401,7 @@ func TestTableMigrateCancelDryRun(t *testing.T) {
}
cancelMigration()

switchWrites(tme)
_, dryRunResults, err := tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true, false, false, true)
require.NoError(t, err)
require.Empty(t, cmp.Diff(want, *dryRunResults))
Expand Down Expand Up @@ -1491,6 +1500,7 @@ func TestTableMigrateNoReverse(t *testing.T) {
}
deleteTargetVReplication()

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, false, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1532,6 +1542,7 @@ func TestMigrateFrozen(t *testing.T) {
), nil)
tme.dbTargetClients[1].addQuery(vreplQueryks2, &sqltypes.Result{}, nil)

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, false, false, true, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1904,6 +1915,7 @@ func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) {
}
cancelMigration()

switchWrites(tme)
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, false, false, true, false)
want = "DeadlineExceeded"
if err == nil || !strings.Contains(err.Error(), want) {
Expand Down Expand Up @@ -2152,3 +2164,11 @@ func stoppedResult(id int) *sqltypes.Result {
func runningResult(id int) *sqltypes.Result {
return getResult(id, "Running", tpChoice.keyspace, tpChoice.shard)
}

func switchWrites(tmeT interface{}) {
if tme, ok := tmeT.(*testMigraterEnv); ok {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
} else if tme, ok := tmeT.(*testShardMigraterEnv); ok {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
}
}
1 change: 1 addition & 0 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) {
tme.tmeDB.AddQuery("drop table vt_ks2.t1", noResult)
tme.tmeDB.AddQuery("drop table vt_ks2.t2", noResult)
tme.tmeDB.AddQuery("update _vt.vreplication set message='Picked source tablet: cell:\"cell1\" uid:10 ' where id=1", noResult)
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", noResult)
tme.tmeDB.AddQuery("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 2", noResult)

Expand Down

0 comments on commit 0c343a5

Please sign in to comment.