From 3372287c80fd7b1e2adade6a7c6dac1653eb2373 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 26 Nov 2021 13:27:16 +0100 Subject: [PATCH 1/4] Set sequence if defined for a table in the vschema when moving tables to unsharded keyspace Signed-off-by: Rohit Nayak --- .../resharding_workflows_v2_test.go | 73 ++++++++++++++++++- .../vreplication/vreplication_test.go | 1 + go/vt/wrangler/materializer.go | 32 ++++++-- 3 files changed, 99 insertions(+), 7 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index f7e1e69b502..8bcbc6e3567 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/wrangler" @@ -244,6 +246,73 @@ func TestBasicV2Workflows(t *testing.T) { log.Flush() } +/* +testVSchemaForSequenceAfterMoveTables checks that the related sequence tag is migrated correctly in the vschema +while moving a table with an auto-increment from sharded to unsharded. +*/ +func testVSchemaForSequenceAfterMoveTables(t *testing.T) { + // at this point the unsharded product and sharded customer keyspaces are created by previous tests + + // use MoveTables to move customer2 from product to customer using + currentWorkflowType = wrangler.MoveTablesWorkflow + err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, + "customer2", workflowActionCreate, "", "", "") + require.NoError(t, err) + time.Sleep(1 * time.Second) + err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, + "", workflowActionSwitchTraffic, "", "", "") + require.NoError(t, err) + err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, + "", workflowActionComplete, "", "", "") + require.NoError(t, err) + + // sanity check + output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + require.NoError(t, err) + assert.NotContains(t, output, "customer2\"", "customer2 still found in keyspace product") + validateCount(t, vtgateConn, "customer", "customer2", 3) + + // check that customer2 has the sequence tag + output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "customer") + require.NoError(t, err) + assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 sequence missing in keyspace customer") + + // ensure sequence is available to vtgate + num := 5 + for i := 0; i < num; i++ { + execVtgateQuery(t, vtgateConn, "customer", "insert into customer2(name) values('a')") + } + validateCount(t, vtgateConn, "customer", "customer2", 3+num) + + // use MoveTables to move customer2 back to product. Note that now the table has an associated sequence + err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs, + "customer2", workflowActionCreate, "", "", "") + require.NoError(t, err) + time.Sleep(1 * time.Second) + err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs, + "", workflowActionSwitchTraffic, "", "", "") + require.NoError(t, err) + err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs, + "", workflowActionComplete, "", "", "") + require.NoError(t, err) + + // sanity check + output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + require.NoError(t, err) + assert.Contains(t, output, "customer2\"", "customer2 not found in keyspace product ") + + // check that customer2 still has the sequence tag + output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + require.NoError(t, err) + assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 still found in keyspace product") + + // ensure sequence is available to vtgate + for i := 0; i < num; i++ { + execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')") + } + validateCount(t, vtgateConn, "product", "customer2", 3+num+num) +} + func testReshardV2Workflow(t *testing.T) { currentWorkflowType = wrangler.ReshardWorkflow @@ -276,7 +345,9 @@ func testMoveTablesV2Workflow(t *testing.T) { output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) require.Contains(t, output, "No workflows found in keyspace customer") - createMoveTablesWorkflow(t, "customer2") + testVSchemaForSequenceAfterMoveTables(t) + + createMoveTablesWorkflow(t, "tenant") output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1") diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 7884fce5c97..442c5141594 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -203,6 +203,7 @@ func insertInitialData(t *testing.T) { execMultipleQueries(t, vtgateConn, "product:0", string(lines)) execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);") execVtgateQuery(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);") + execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);") log.Infof("Done inserting initial data") validateCount(t, vtgateConn, "product:0", "product", 2) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 6fefdafea45..c29ad6d026c 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -65,6 +65,28 @@ const ( createDDLAsCopyDropConstraint = "copy:drop_constraint" ) +func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, vschema *vschemapb.Keyspace, tables []string) error { + if vschema.Tables == nil { + vschema.Tables = make(map[string]*vschemapb.Table) + } + srcVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace) + if err != nil { + return err + } + if srcVSchema == nil { + return fmt.Errorf("no vschema found for source keyspace %s", sourceKeyspace) + } + for _, table := range tables { + tbl := &vschemapb.Table{} + srcTable, ok := srcVSchema.Tables[table] + if ok { + tbl.AutoIncrement = srcTable.AutoIncrement + } + vschema.Tables[table] = tbl + } + return nil +} + // MoveTables initiates moving table(s) over to another keyspace func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs, cell, tabletTypes string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool, @@ -74,7 +96,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta var externalTopo *topo.Server var err error - if externalCluster != "" { + if externalCluster != "" { // when the source is an external mysql cluster mounted using the Mount command externalTopo, err = wr.ts.OpenExternalVitessClusterServer(ctx, externalCluster) if err != nil { return err @@ -82,6 +104,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta wr.sourceTs = externalTopo log.Infof("Successfully opened external topo: %+v", externalTopo) } + var vschema *vschemapb.Keyspace vschema, err = wr.ts.GetVSchema(ctx, targetKeyspace) if err != nil { @@ -150,11 +173,8 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta log.Infof("Found tables to move: %s", strings.Join(tables, ",")) if !vschema.Sharded { - if vschema.Tables == nil { - vschema.Tables = make(map[string]*vschemapb.Table) - } - for _, table := range tables { - vschema.Tables[table] = &vschemapb.Table{} + if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables); err != nil { + return err } } } From 9ac52665beb7d4dd0b0b0f0beb82fb421483bbf8 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 26 Nov 2021 14:18:06 +0100 Subject: [PATCH 2/4] Do not copy sequence info for a migrate workflow Signed-off-by: Rohit Nayak --- go/vt/wrangler/materializer.go | 41 +++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index c29ad6d026c..064c2ac7f4c 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -65,24 +65,33 @@ const ( createDDLAsCopyDropConstraint = "copy:drop_constraint" ) -func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, vschema *vschemapb.Keyspace, tables []string) error { - if vschema.Tables == nil { - vschema.Tables = make(map[string]*vschemapb.Table) - } - srcVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace) - if err != nil { - return err - } - if srcVSchema == nil { - return fmt.Errorf("no vschema found for source keyspace %s", sourceKeyspace) +// addTablesToVSchema adds tables to an (unsharded) vschema. Depending on copyAttributes It will also add any sequence info +// that is associated with a table by copying it from the vschema of the source keyspace. +// For a migrate workflow we do not copy attributes since the source keyspace is just a proxy to import data into Vitess +// Todo: For now we only copy sequence but later we may also want to copy other attributes like authoritative column flag and list of columns +func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyAttributes bool) error { + if targetVSchema.Tables == nil { + targetVSchema.Tables = make(map[string]*vschemapb.Table) } for _, table := range tables { - tbl := &vschemapb.Table{} - srcTable, ok := srcVSchema.Tables[table] - if ok { - tbl.AutoIncrement = srcTable.AutoIncrement + targetVSchema.Tables[table] = &vschemapb.Table{} + } + + if copyAttributes { // if source keyspace is provided, copy over the sequence info. + srcVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace) + if err != nil { + return err } - vschema.Tables[table] = tbl + if srcVSchema == nil { + return fmt.Errorf("no vschema found for source keyspace %s", sourceKeyspace) + } + for _, table := range tables { + srcTable, ok := srcVSchema.Tables[table] + if ok { + targetVSchema.Tables[table].AutoIncrement = srcTable.AutoIncrement + } + } + } return nil } @@ -173,7 +182,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta log.Infof("Found tables to move: %s", strings.Join(tables, ",")) if !vschema.Sharded { - if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables); err != nil { + if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil { return err } } From cab2a7e1ba6a7fd1c49ff4c0786c89c33f13015c Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 5 Dec 2021 15:08:46 +0100 Subject: [PATCH 3/4] Address review comments Signed-off-by: Rohit Nayak --- .../resharding_workflows_v2_test.go | 32 +++++++++++++++++-- go/vt/wrangler/materializer.go | 3 -- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 8bcbc6e3567..0fd42b2ccf8 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -246,6 +246,28 @@ func TestBasicV2Workflows(t *testing.T) { log.Flush() } +func waitForWorkflowToStart(t *testing.T, ksWorkflow string) { + done := false + ticker := time.NewTicker(10 * time.Millisecond) + log.Infof("Waiting for workflow %s to start", ksWorkflow) + for { + select { + case <-ticker.C: + if done { + return + } + output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") + require.NoError(t, err) + if strings.Contains(output, "\"State\": \"Running\"") { + done = true + log.Infof("Workflow %s has started", ksWorkflow) + } + case <-time.After(5 * time.Second): + require.FailNow(t, "workflow %s not yet started", ksWorkflow) + } + } +} + /* testVSchemaForSequenceAfterMoveTables checks that the related sequence tag is migrated correctly in the vschema while moving a table with an auto-increment from sharded to unsharded. @@ -258,7 +280,9 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, "customer2", workflowActionCreate, "", "", "") require.NoError(t, err) - time.Sleep(1 * time.Second) + + waitForWorkflowToStart(t, "customer.wf2") + err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, "", workflowActionSwitchTraffic, "", "", "") require.NoError(t, err) @@ -283,12 +307,14 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { execVtgateQuery(t, vtgateConn, "customer", "insert into customer2(name) values('a')") } validateCount(t, vtgateConn, "customer", "customer2", 3+num) + want := fmt.Sprintf("[[INT32(%d)]]", 100+num-1) + validateQuery(t, vtgateConn, "customer", "select max(cid) from customer2", want) // use MoveTables to move customer2 back to product. Note that now the table has an associated sequence err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs, "customer2", workflowActionCreate, "", "", "") require.NoError(t, err) - time.Sleep(1 * time.Second) + waitForWorkflowToStart(t, "product.wf3") err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs, "", workflowActionSwitchTraffic, "", "", "") require.NoError(t, err) @@ -311,6 +337,8 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')") } validateCount(t, vtgateConn, "product", "customer2", 3+num+num) + want = fmt.Sprintf("[[INT32(%d)]]", 100+num+num-1) + validateQuery(t, vtgateConn, "product", "select max(cid) from customer2", want) } func testReshardV2Workflow(t *testing.T) { diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index 064c2ac7f4c..353df8e1ab7 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -82,9 +82,6 @@ func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace strin if err != nil { return err } - if srcVSchema == nil { - return fmt.Errorf("no vschema found for source keyspace %s", sourceKeyspace) - } for _, table := range tables { srcTable, ok := srcVSchema.Tables[table] if ok { From dd042cf4670c71527bc731004251e403a1db754f Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 9 Dec 2021 11:38:32 +0100 Subject: [PATCH 4/4] Address review comments Signed-off-by: Rohit Nayak --- go.mod | 3 ++ go.sum | 6 ++++ .../resharding_workflows_v2_test.go | 35 +++++++++++++++---- 3 files changed, 37 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 2d9ea709f44..e05cee0f695 100644 --- a/go.mod +++ b/go.mod @@ -83,6 +83,7 @@ require ( github.com/stretchr/testify v1.7.0 github.com/tchap/go-patricia v2.2.6+incompatible github.com/tebeka/selenium v0.9.9 + github.com/tidwall/gjson v1.12.1 github.com/tinylib/msgp v1.1.1 // indirect github.com/uber-go/atomic v1.4.0 // indirect github.com/uber/jaeger-client-go v2.16.0+incompatible @@ -174,6 +175,8 @@ require ( github.com/spf13/cast v1.3.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/subosito/gotenv v1.2.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.0.1 // indirect go.opencensus.io v0.23.0 // indirect diff --git a/go.sum b/go.sum index 08faa0c6b25..81483926518 100644 --- a/go.sum +++ b/go.sum @@ -740,7 +740,13 @@ github.com/tchap/go-patricia v2.2.6+incompatible h1:JvoDL7JSoIP2HDE8AbDH3zC8QBPx github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ23RP/odRBOTVjwp2cDyi6I= github.com/tebeka/selenium v0.9.9 h1:cNziB+etNgyH/7KlNI7RMC1ua5aH1+5wUlFQyzeMh+w= github.com/tebeka/selenium v0.9.9/go.mod h1:5Fr8+pUvU6B1OiPfkdCKdXZyr5znvVkxuPd0NOdZCQc= +github.com/tidwall/gjson v1.12.1 h1:ikuZsLdhr8Ws0IdROXUS1Gi4v9Z4pGqpX/CvJkxvfpo= +github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/tinylib/msgp v1.1.1 h1:TnCZ3FIuKeaIy+F45+Cnp+caqdXGy4z74HvwXN+570Y= github.com/tinylib/msgp v1.1.1/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 0fd42b2ccf8..6d7d2e3541a 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/tidwall/gjson" + "github.com/stretchr/testify/assert" "vitess.io/vitess/go/vt/log" @@ -246,24 +248,43 @@ func TestBasicV2Workflows(t *testing.T) { log.Flush() } +const workflowStartTimeout = 5 * time.Second + func waitForWorkflowToStart(t *testing.T, ksWorkflow string) { done := false - ticker := time.NewTicker(10 * time.Millisecond) + ticker := time.NewTicker(100 * time.Millisecond) + timer := time.NewTimer(workflowStartTimeout) log.Infof("Waiting for workflow %s to start", ksWorkflow) for { select { case <-ticker.C: if done { + log.Infof("Workflow %s has started", ksWorkflow) return } output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show") require.NoError(t, err) - if strings.Contains(output, "\"State\": \"Running\"") { - done = true - log.Infof("Workflow %s has started", ksWorkflow) - } - case <-time.After(5 * time.Second): - require.FailNow(t, "workflow %s not yet started", ksWorkflow) + done = true + state := "" + result := gjson.Get(output, "ShardStatuses") + result.ForEach(func(tabletId, tabletStreams gjson.Result) bool { // for each participating tablet + tabletStreams.ForEach(func(streamId, streamInfos gjson.Result) bool { // for each stream + if streamId.String() == "PrimaryReplicationStatuses" { + streamInfos.ForEach(func(attributeKey, attributeValue gjson.Result) bool { // for each attribute in the stream + state = attributeValue.Get("State").String() + if state != "Running" { + done = false // we need to wait for all streams to start + } + return true + }) + } + return true + }) + return true + }) + + case <-timer.C: + require.FailNowf(t, "workflow %s not yet started", ksWorkflow) } } }