Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MoveTables: update vschema while moving tables with autoincrement from sharded to unsharded #9288

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,28 @@ func TestBasicV2Workflows(t *testing.T) {
log.Flush()
}

func waitForWorkflowToStart(t *testing.T, ksWorkflow string) {
done := false
ticker := time.NewTicker(10 * time.Millisecond)
log.Infof("Waiting for workflow %s to start", ksWorkflow)
for {
select {
case <-ticker.C:
if done {
return
}
output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", ksWorkflow, "show")
require.NoError(t, err)
if strings.Contains(output, "\"State\": \"Running\"") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, maybe better to use a RegEx or JSON parsing so that we are checking for what we really care about while resilient to whitespace and other minor format changes? Full JSON parsing is probably overkill, maybe RegEx is too :-)

Feel free to ignore.

done = true
log.Infof("Workflow %s has started", ksWorkflow)
}
case <-time.After(5 * time.Second):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, any reason to have the timeout this low? We risk causing the entire action/test to fail. I'm not sure more time is really needed, just curious. We could also make the wait time a parameter.

Feel free to ignore.

require.FailNow(t, "workflow %s not yet started", ksWorkflow)
}
}
}

/*
testVSchemaForSequenceAfterMoveTables checks that the related sequence tag is migrated correctly in the vschema
while moving a table with an auto-increment from sharded to unsharded.
Expand All @@ -258,7 +280,9 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"customer2", workflowActionCreate, "", "", "")
require.NoError(t, err)
time.Sleep(1 * time.Second)

waitForWorkflowToStart(t, "customer.wf2")

err = tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs,
"", workflowActionSwitchTraffic, "", "", "")
require.NoError(t, err)
Expand All @@ -283,12 +307,14 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
execVtgateQuery(t, vtgateConn, "customer", "insert into customer2(name) values('a')")
}
validateCount(t, vtgateConn, "customer", "customer2", 3+num)
want := fmt.Sprintf("[[INT32(%d)]]", 100+num-1)
validateQuery(t, vtgateConn, "customer", "select max(cid) from customer2", want)

// use MoveTables to move customer2 back to product. Note that now the table has an associated sequence
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"customer2", workflowActionCreate, "", "", "")
require.NoError(t, err)
time.Sleep(1 * time.Second)
waitForWorkflowToStart(t, "product.wf3")
err = tstWorkflowExec(t, defaultCellName, "wf3", targetKs, sourceKs,
"", workflowActionSwitchTraffic, "", "", "")
require.NoError(t, err)
Expand All @@ -311,6 +337,8 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) {
execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')")
}
validateCount(t, vtgateConn, "product", "customer2", 3+num+num)
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
want = fmt.Sprintf("[[INT32(%d)]]", 100+num+num-1)
validateQuery(t, vtgateConn, "product", "select max(cid) from customer2", want)
}

func testReshardV2Workflow(t *testing.T) {
Expand Down
3 changes: 0 additions & 3 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ func (wr *Wrangler) addTablesToVSchema(ctx context.Context, sourceKeyspace strin
if err != nil {
return err
}
if srcVSchema == nil {
return fmt.Errorf("no vschema found for source keyspace %s", sourceKeyspace)
}
for _, table := range tables {
srcTable, ok := srcVSchema.Tables[table]
if ok {
Expand Down