Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MoveTables: update vschema while moving tables with autoincrement from sharded to unsharded #9288

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 100 additions & 1 deletion go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/wrangler"
Expand Down Expand Up @@ -244,6 +246,101 @@ func TestBasicV2Workflows(t *testing.T) {
log.Flush()
}

func waitForWorkflowToStart(t *testing.T, ksWorkflow string) {
done := false
ticker := time.NewTicker(10 * time.Millisecond)
log.Infof("Waiting for workflow %s to start", ksWorkflow)
for {
select {
case <-ticker.C:
if done {
return
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
if strings.Contains(output, "\"State\": \"Running\"") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, maybe better to use a RegEx or JSON parsing so that we are checking for what we really care about while resilient to whitespace and other minor format changes? Full JSON parsing is probably overkill, maybe RegEx is too :-)

Feel free to ignore.

done = true
log.Infof("Workflow %s has started", ksWorkflow)
}
case <-time.After(5 * time.Second):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, any reason to have the timeout this low? We risk causing the entire action/test to fail. I'm not sure more time is really needed, just curious. We could also make the wait time a parameter.

Feel free to ignore.

require.FailNow(t, "workflow %s not yet started", ksWorkflow)
}
}
}

/*
testVSchemaForSequenceAfterMoveTables checks that the related sequence tag is migrated correctly in the vschema
while moving a table with an auto-increment from sharded to unsharded.
*/
func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
// at this point the unsharded product and sharded customer keyspaces are created by previous tests

// use MoveTables to move customer2 from product to customer using
currentWorkflowType = wrangler.MoveTablesWorkflow
err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"customer2", workflowActionCreate, "", "", "")
require.NoError(t, err)

waitForWorkflowToStart(t, "customer.wf2")

err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"", workflowActionSwitchTraffic, "", "", "")
require.NoError(t, err)
err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"", workflowActionComplete, "", "", "")
require.NoError(t, err)

// sanity check
output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product")
require.NoError(t, err)
assert.NotContains(t, output, "customer2\"", "customer2 still found in keyspace product")
validateCount(t, vtgateConn, "customer", "customer2", 3)

// check that customer2 has the sequence tag
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "customer")
require.NoError(t, err)
assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 sequence missing in keyspace customer")

// ensure sequence is available to vtgate
num := 5
for i := 0; i < num; i++ {
execVtgateQuery(t, vtgateConn, "customer", "insert into customer2(name) values('a')")
}
validateCount(t, vtgateConn, "customer", "customer2", 3+num)
want := fmt.Sprintf("[[INT32(%d)]]", 100+num-1)
validateQuery(t, vtgateConn, "customer", "select max(cid) from customer2", want)

// use MoveTables to move customer2 back to product. Note that now the table has an associated sequence
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"customer2", workflowActionCreate, "", "", "")
require.NoError(t, err)
waitForWorkflowToStart(t, "product.wf3")
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"", workflowActionSwitchTraffic, "", "", "")
require.NoError(t, err)
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"", workflowActionComplete, "", "", "")
require.NoError(t, err)

// sanity check
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product")
require.NoError(t, err)
assert.Contains(t, output, "customer2\"", "customer2 not found in keyspace product ")

// check that customer2 still has the sequence tag
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product")
require.NoError(t, err)
assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 still found in keyspace product")

// ensure sequence is available to vtgate
for i := 0; i < num; i++ {
execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')")
}
validateCount(t, vtgateConn, "product", "customer2", 3+num+num)
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
want = fmt.Sprintf("[[INT32(%d)]]", 100+num+num-1)
validateQuery(t, vtgateConn, "product", "select max(cid) from customer2", want)
}

func testReshardV2Workflow(t *testing.T) {
currentWorkflowType = wrangler.ReshardWorkflow

Expand Down Expand Up @@ -276,7 +373,9 @@ func testMoveTablesV2Workflow(t *testing.T) {
output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "No workflows found in keyspace customer")

createMoveTablesWorkflow(t, "customer2")
testVSchemaForSequenceAfterMoveTables(t)

createMoveTablesWorkflow(t, "tenant")
output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...)
require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1")

Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func insertInitialData(t *testing.T) {
execMultipleQueries(t, vtgateConn, "product:0", string(lines))
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, "product:0", "insert into order_seq(id, next_id, cache) values(0, 100, 100);")
execVtgateQuery(t, vtgateConn, "product:0", "insert into customer_seq2(id, next_id, cache) values(0, 100, 100);")
log.Infof("Done inserting initial data")

validateCount(t, vtgateConn, "product:0", "product", 2)
Expand Down
38 changes: 32 additions & 6 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,34 @@ const (
createDDLAsCopyDropConstraint = "copy:drop_constraint"
)

// addTablesToVSchema adds tables to an (unsharded) vschema. Depending on copyAttributes It will also add any sequence info
// that is associated with a table by copying it from the vschema of the source keyspace.
// For a migrate workflow we do not copy attributes since the source keyspace is just a proxy to import data into Vitess
// Todo: For now we only copy sequence but later we may also want to copy other attributes like authoritative column flag and list of columns
func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace string, targetVSchema *vschemapb.Keyspace, tables []string, copyAttributes bool) error {
if targetVSchema.Tables == nil {
targetVSchema.Tables = make(map[string]*vschemapb.Table)
}
for _, table := range tables {
targetVSchema.Tables[table] = &vschemapb.Table{}
}

if copyAttributes { // if source keyspace is provided, copy over the sequence info.
srcVSchema, err := wr.ts.GetVSchema(ctx, sourceKeyspace)
if err != nil {
return err
}
for _, table := range tables {
srcTable, ok := srcVSchema.Tables[table]
if ok {
targetVSchema.Tables[table].AutoIncrement = srcTable.AutoIncrement
}
}

}
return nil
}

// MoveTables initiates moving table(s) over to another keyspace
func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, targetKeyspace, tableSpecs,
cell, tabletTypes string, allTables bool, excludeTables string, autoStart, stopAfterCopy bool,
Expand All @@ -74,14 +102,15 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
var externalTopo *topo.Server
var err error

if externalCluster != "" {
if externalCluster != "" { // when the source is an external mysql cluster mounted using the Mount command
externalTopo, err = wr.ts.OpenExternalVitessClusterServer(ctx, externalCluster)
if err != nil {
return err
}
wr.sourceTs = externalTopo
log.Infof("Successfully opened external topo: %+v", externalTopo)
}

var vschema *vschemapb.Keyspace
vschema, err = wr.ts.GetVSchema(ctx, targetKeyspace)
if err != nil {
Expand Down Expand Up @@ -150,11 +179,8 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
log.Infof("Found tables to move: %s", strings.Join(tables, ","))

if !vschema.Sharded {
if vschema.Tables == nil {
vschema.Tables = make(map[string]*vschemapb.Table)
}
for _, table := range tables {
vschema.Tables[table] = &vschemapb.Table{}
if err := wr.addTablesToVSchema(ctx, sourceKeyspace, vschema, tables, externalTopo == nil); err != nil {
return err
}
}
}
Expand Down