Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Improve MoveTables Create Error Handling #13737

Merged
merged 22 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5a908cf
MoveTables: don't create routing rules until after streams
mattlord Aug 7, 2023
e478ab2
Add vtctlclient unit test (that fails on main)
mattlord Aug 7, 2023
d200fd7
Improve comment
mattlord Aug 7, 2023
637ba76
Pedantic cleanup
mattlord Aug 8, 2023
a1dd8e1
Minor changes after self review
mattlord Aug 8, 2023
411f2fe
Stop using t.Fatal in new tests
mattlord Aug 8, 2023
d0d0f28
Merge remote-tracking branch 'origin/main' into movetables_routing
mattlord Aug 9, 2023
b47fe10
Add vtctldclient unit test
mattlord Aug 10, 2023
d7360b8
Automatically clean up on MoveTables Create error
mattlord Aug 14, 2023
83f5938
Add missing else in workflow server
mattlord Aug 14, 2023
3c381c4
Restore original logic
mattlord Aug 14, 2023
ed1beae
Add named return values for workflow case
mattlord Aug 14, 2023
fe4a51e
Merge remote-tracking branch 'origin/main' into movetables_routing
mattlord Aug 15, 2023
d98416f
Improve vtctldclient unit test
mattlord Aug 15, 2023
5741fcf
Minor tweaks
mattlord Aug 15, 2023
982bc2f
Minor formatting improvements
mattlord Aug 15, 2023
f0713ee
Must stop nitting...
mattlord Aug 15, 2023
e3fc307
Merge remote-tracking branch 'origin/main' into movetables_routing
mattlord Aug 16, 2023
1da742b
Merge remote-tracking branch 'origin/main' into movetables_routing
mattlord Aug 20, 2023
24d8332
Don't fail e2e workflow if schema tracker is slow
mattlord Aug 20, 2023
80f8a03
Turns out that WorkflowDelete isn't the right fit
mattlord Aug 20, 2023
dc2742f
Address review comment
mattlord Aug 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 36 additions & 32 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,39 +1004,7 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}
}
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, s.ts)
if err != nil {
return nil, err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil {
return nil, err
}

if vschema != nil {
// We added to the vschema.
if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return nil, err
}
}
}
if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}
ms := &vtctldatapb.MaterializeSettings{
Workflow: req.Workflow,
MaterializationIntent: vtctldatapb.MaterializationIntent_MOVETABLES,
Expand Down Expand Up @@ -1081,6 +1049,42 @@ func (s *Server) MoveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
return nil, err
}

// Now that the streams have been successfully created, let's put the associated
// routing rules in place.
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, s.ts)
if err != nil {
return nil, err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, s.ts, rules); err != nil {
return nil, err
}

if vschema != nil {
// We added to the vschema.
if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return nil, err
}
}
}
if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}

if ms.SourceTimeZone != "" {
if err := mz.checkTZConversion(ctx, ms.SourceTimeZone); err != nil {
return nil, err
Expand Down
69 changes: 36 additions & 33 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,39 +212,6 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
}
}
}
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, wr.ts)
if err != nil {
return err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil {
return err
}

if vschema != nil {
// We added to the vschema.
if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return err
}
}
}
if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return err
}
tabletTypes, inorder, err := discovery.ParseTabletTypesAndOrder(tabletTypesStr)
if err != nil {
return err
Expand Down Expand Up @@ -290,6 +257,42 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
return err
}

// Now that the streams have been successfully created, let's put the associated
mattlord marked this conversation as resolved.
Show resolved Hide resolved
// routing rules in place.
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := topotools.GetRoutingRules(ctx, wr.ts)
if err != nil {
return err
}
for _, table := range tables {
toSource := []string{sourceKeyspace + "." + table}
rules[table] = toSource
rules[table+"@replica"] = toSource
rules[table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[targetKeyspace+"."+table+"@replica"] = toSource
rules[targetKeyspace+"."+table+"@rdonly"] = toSource
rules[targetKeyspace+"."+table] = toSource
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil {
return err
}

if vschema != nil {
// We added to the vschema.
if err := wr.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil {
return err
}
}
}
if err := wr.ts.RebuildSrvVSchema(ctx, nil); err != nil {
return err
}

if sourceTimeZone != "" {
if err := mz.checkTZConversion(ctx, sourceTimeZone); err != nil {
return err
Expand Down
39 changes: 30 additions & 9 deletions go/vt/wrangler/traffic_switcher_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ import (
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vschema"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
Expand All @@ -48,6 +44,11 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
)

const (
Expand Down Expand Up @@ -180,6 +181,17 @@ func newTestTableMigraterCustom(ctx context.Context, t *testing.T, sourceShards,
},
},
}
schema := &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{
{
Name: "t1",
},
{
Name: "t2",
},
},
}
tme.setPrimarySchemas(schema)
if len(sourceShards) != 1 {
if err := tme.ts.SaveVSchema(ctx, "ks1", vs); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -499,26 +511,26 @@ func newTestShardMigrater(ctx context.Context, t *testing.T, sourceShards, targe

vs := &vschemapb.Keyspace{
Sharded: true,
Vindexes: map[string]*vschema.Vindex{
Vindexes: map[string]*vschemapb.Vindex{
"thash": {
Type: "hash",
},
},
Tables: map[string]*vschema.Table{
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschema.ColumnVindex{{
ColumnVindexes: []*vschemapb.ColumnVindex{{
Columns: []string{"c1"},
Name: "thash",
}},
},
"t2": {
ColumnVindexes: []*vschema.ColumnVindex{{
ColumnVindexes: []*vschemapb.ColumnVindex{{
Columns: []string{"c1"},
Name: "thash",
}},
},
"t3": {
ColumnVindexes: []*vschema.ColumnVindex{{
ColumnVindexes: []*vschemapb.ColumnVindex{{
Columns: []string{"c1"},
Name: "thash",
}},
Expand Down Expand Up @@ -678,6 +690,15 @@ func (tme *testMigraterEnv) setPrimaryPositions() {
}
}

func (tme *testMigraterEnv) setPrimarySchemas(schema *tabletmanagerdatapb.SchemaDefinition) {
for _, primary := range tme.sourcePrimaries {
primary.FakeMysqlDaemon.Schema = schema
}
for _, primary := range tme.targetPrimaries {
primary.FakeMysqlDaemon.Schema = schema
}
}

func (tme *testMigraterEnv) expectNoPreviousJournals() {
// validate that no previous journals exist
for _, dbclient := range tme.dbSourceClients {
Expand Down
29 changes: 29 additions & 0 deletions go/vt/wrangler/traffic_switcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
Expand Down Expand Up @@ -2143,6 +2144,34 @@ func TestIsPartialMoveTables(t *testing.T) {
}
}

// TestNoOrphanedRoutingRulesOnFailedCreate tests that no orphaned routing rules
// are left in place when the workflow creation fails -- specifically at the point
// where we try and create the workflow streams.
func TestNoOrphanedRoutingRulesOnFailedCreate(t *testing.T) {
ctx := context.Background()
tme := newTestTableMigraterCustom(ctx, t, []string{"0"}, []string{"-80", "80-"}, "select * %s")
defer tme.close(t)

// The target keyspace is sharded. Let's remove the vschema definitions so
// that we know the workflow creation will fail.
// Let's also be sure that the routing rules are empty.
err := topotools.SaveRoutingRules(ctx, tme.wr.ts, nil)
require.NoError(t, err, "failed to save routing rules")
err = tme.ts.SaveVSchema(ctx, "ks2", nil)
require.NoError(t, err, "failed to save vschema")
err = tme.ts.RebuildSrvVSchema(ctx, nil)
require.NoError(t, err, "failed to rebuild serving vschema")
err = topotools.RebuildKeyspace(ctx, logutil.NewConsoleLogger(), tme.ts, "ks1", []string{"cell1"}, false)
require.NoError(t, err, "failed to rebuild keyspace")

err = tme.wr.MoveTables(ctx, "testwf", "ks1", "ks2", "t1,t2", "cell1", "primary,replica", false, "", true, false, "", false, false, "", "", nil)
require.Error(t, err)

// Check that there are no orphaned routing rules.
emptyRules := make(map[string][]string)
checkRouting(t, tme.wr, emptyRules)
}

func checkRouting(t *testing.T, wr *Wrangler, want map[string][]string) {
t.Helper()
ctx := context.Background()
Expand Down
Loading