Skip to content

Commit

Permalink
Improve error and type handling in LOCK TABLE related code
Browse files Browse the repository at this point in the history
And update tests

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jan 8, 2022
1 parent 3f4fea1 commit e61e873
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 19 deletions.
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
mainClusterConfig *ClusterConfig
externalClusterConfig *ClusterConfig
extraVTGateArgs = []string{"-tablet_refresh_interval", "10ms"}
extraVtctldArgs = []string{"-remote_operation_timeout", "600s", "-topo_etcd_lease_ttl", "120"}
)

// ClusterConfig defines the parameters like ports, tmpDir, tablet types which uniquely define a vitess cluster
Expand Down Expand Up @@ -170,7 +171,7 @@ func NewVitessCluster(t *testing.T, name string, cellNames []string, clusterConf
vc.Vtctld = vtctld
require.NotNil(t, vc.Vtctld)
// use first cell as `-cell`
vc.Vtctld.Setup(cellNames[0])
vc.Vtctld.Setup(cellNames[0], extraVtctldArgs...)

vc.Vtctl = cluster.VtctlProcessInstance(vc.ClusterConfig.topoPort, vc.ClusterConfig.hostname)
require.NotNil(t, vc.Vtctl)
Expand Down
54 changes: 36 additions & 18 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand All @@ -65,8 +66,8 @@ type tableLockAction int
const (
allowWrites = accessType(iota)
disallowWrites
tableLockRequest = tableLockAction(0)
tableLockRelease = tableLockAction(1)
tableLockAcquire = tableLockAction(iota)
tableLockRelease
)

// trafficSwitcher contains the metadata for switching read and write traffic
Expand Down Expand Up @@ -140,25 +141,31 @@ func (ts *trafficSwitcher) ForAllSources(f func(source *workflow.MigrationSource
}

func (ts *trafficSwitcher) ExecQueryOnAllPrimarySources(ctx context.Context, query string) error {
if ts.Sources() == nil || len(ts.Sources()) == 0 {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "workflow %s has no source shards in the %s keyspace", ts.WorkflowName(), ts.SourceKeyspaceName())
}
var wg sync.WaitGroup
allErrors := &concurrency.AllErrorRecorder{}
type doneRecorder struct {
record map[string]bool
record map[*topodatapb.TabletAlias]bool
mu sync.Mutex
}
done := doneRecorder{record: make(map[string]bool)}
done := doneRecorder{record: make(map[*topodatapb.TabletAlias]bool)}

for _, source := range ts.sources {
for _, source := range ts.Sources() {
if source.GetPrimary() == nil {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "source shard %v does not have a primary tablet", source.GetShard())
}
wg.Add(1)
go func(tablet *topodatapb.Tablet) {
defer wg.Done()
done.mu.Lock()
defer done.mu.Unlock()
alias := tablet.Alias.String()
alias := tablet.GetAlias()

if !done.record[alias] {
if _, err := ts.wr.ExecuteFetchAsDba(ctx, tablet.Alias, query, 0, false, false); err != nil {
log.Infof("error locking tables on source tablet %v: %v. Query used: %s", tablet, err, query)
if _, err := ts.wr.ExecuteFetchAsDba(ctx, alias, query, 0, false, false); err != nil {
ts.Logger().Warningf("Error locking tables on source tablet %v: %v", tablet, err)
allErrors.RecordError(err)
}
done.record[alias] = true
Expand Down Expand Up @@ -1001,7 +1008,12 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {
var err error
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
if err = ts.changeTableSourceWrites(ctx, disallowWrites); err == nil {
err = ts.manageTableLocksSourceWrites(ctx, tableLockRequest)
err = ts.manageTableLocksOnSources(ctx, tableLockAcquire)
defer func() {
if err := ts.manageTableLocksOnSources(ctx, tableLockRelease); err != nil {
ts.Logger().Warningf("error releasing table locks on source keyspace %v: %v", ts.SourceKeyspaceName(), err)
}
}()
}
} else {
err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites)
Expand Down Expand Up @@ -1034,20 +1046,23 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a
})
}

func (ts *trafficSwitcher) manageTableLocksSourceWrites(ctx context.Context, action tableLockAction) error {
ts.Logger().Infof("Locking tables on source keyspace %v, tables: %v", ts.TargetKeyspaceName(), ts.Tables())
func (ts *trafficSwitcher) manageTableLocksOnSources(ctx context.Context, action tableLockAction) error {
ts.Logger().Infof("Locking the following tables on source keyspace %v: %v", ts.SourceKeyspaceName(), ts.Tables())
if ts.Tables() == nil || len(ts.Tables()) == 0 {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "no tables found in the source keyspace %v associated with the %s workflow", ts.SourceKeyspaceName(), ts.WorkflowName())
}
var query string
switch action {
case tableLockRequest:
case tableLockAcquire:
sb := strings.Builder{}
sb.WriteString("LOCK TABLES ")
sb.WriteString("lock tables ")
for _, table := range ts.Tables() {
sb.WriteString(fmt.Sprintf("%s READ,", sqlescape.EscapeID(table)))
sb.WriteString(fmt.Sprintf("%s read,", sqlescape.EscapeID(table)))
}
// trim extra trailing comma
query = sb.String()[:sb.Len()-1]
case tableLockRelease:
query = "UNLOCK TABLES"
query = "unlock tables"
}
return ts.ExecQueryOnAllPrimarySources(ctx, query)
}
Expand Down Expand Up @@ -1090,9 +1105,12 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati
func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *workflow.StreamMigrator) {
var err error
if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
if err = ts.changeTableSourceWrites(ctx, allowWrites); err == nil {
ts.manageTableLocksSourceWrites(ctx, tableLockRelease)
}
err = ts.changeTableSourceWrites(ctx, allowWrites)
defer func() {
if err := ts.manageTableLocksOnSources(ctx, tableLockRelease); err != nil {
ts.Logger().Warningf("Error releasing table locks on source keyspace %v: %v", ts.SourceKeyspaceName(), err)
}
}()
} else {
err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
Expand Down
55 changes: 55 additions & 0 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ func TestTableMigrateMainflow(t *testing.T) {
}
cancelMigration()

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, false, false, true, false)
want = "DeadlineExceeded"
if err == nil || !strings.Contains(err.Error(), want) {
Expand Down Expand Up @@ -862,6 +867,11 @@ func testTableMigrateOneToMany(t *testing.T, keepData, keepRoutingRules bool) {
require.Error(t, err, "Workflow has not completed, cannot DropSources")

tme.dbSourceClients[0].addQueryRE(tsCheckJournals, &sqltypes.Result{}, nil)
switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, false, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1080,6 +1090,11 @@ func TestTableMigrateOneToManyDryRun(t *testing.T) {
}
deleteTargetVReplication()

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, results, err := tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, false, true)
require.NoError(t, err)
require.Empty(t, cmp.Diff(wantdryRunWrites, *results))
Expand Down Expand Up @@ -1165,6 +1180,11 @@ func TestMigrateFailJournal(t *testing.T) {
tme.dbSourceClients[0].addQueryRE("insert into _vt.resharding_journal", nil, errors.New("journaling intentionally failed"))
tme.dbSourceClients[1].addQueryRE("insert into _vt.resharding_journal", nil, errors.New("journaling intentionally failed"))

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false)
want := "journaling intentionally failed"
if err == nil || !strings.Contains(err.Error(), want) {
Expand Down Expand Up @@ -1226,6 +1246,11 @@ func TestTableMigrateJournalExists(t *testing.T) {
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 1", stoppedResult(1), nil)
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil)

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1303,6 +1328,11 @@ func TestShardMigrateJournalExists(t *testing.T) {
tme.dbTargetClients[1].addQuery("update _vt.vreplication set message = 'FROZEN' where id in (2)", &sqltypes.Result{}, nil)
tme.dbTargetClients[1].addQuery("select * from _vt.vreplication where id = 2", stoppedResult(2), nil)

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, true, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1365,6 +1395,11 @@ func TestTableMigrateCancel(t *testing.T) {
}
cancelMigration()

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true, false, false, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1424,6 +1459,11 @@ func TestTableMigrateCancelDryRun(t *testing.T) {
}
cancelMigration()

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, dryRunResults, err := tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, true, false, false, true)
require.NoError(t, err)
require.Empty(t, cmp.Diff(want, *dryRunResults))
Expand Down Expand Up @@ -1522,6 +1562,11 @@ func TestTableMigrateNoReverse(t *testing.T) {
}
deleteTargetVReplication()

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 1*time.Second, false, false, false, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1563,6 +1608,11 @@ func TestMigrateFrozen(t *testing.T) {
), nil)
tme.dbTargetClients[1].addQuery(vreplQueryks2, &sqltypes.Result{}, nil)

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, false, false, true, false)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -1935,6 +1985,11 @@ func TestShardMigrateNoAvailableTabletsForReverseReplication(t *testing.T) {
}
cancelMigration()

switchWrites := func() {
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", &sqltypes.Result{})
tme.tmeDB.AddQuery("unlock tables", &sqltypes.Result{})
}
switchWrites()
_, _, err = tme.wr.SwitchWrites(ctx, tme.targetKeyspace, "test", 0*time.Second, false, false, true, false)
want = "DeadlineExceeded"
if err == nil || !strings.Contains(err.Error(), want) {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/wrangler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ func expectMoveTablesQueries(t *testing.T, tme *testMigraterEnv) {
tme.tmeDB.AddQuery("drop table vt_ks2.t1", noResult)
tme.tmeDB.AddQuery("drop table vt_ks2.t2", noResult)
tme.tmeDB.AddQuery("update _vt.vreplication set message='Picked source tablet: cell:\"cell1\" uid:10 ' where id=1", noResult)
tme.tmeDB.AddQuery("lock tables `t1` read,`t2` read", noResult)
tme.tmeDB.AddQuery("unlock tables", noResult)
tme.tmeDB.AddQuery("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", noResult)
tme.tmeDB.AddQuery("select 1 from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 2", noResult)

Expand Down

0 comments on commit e61e873

Please sign in to comment.