Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement VDiff2 Delete Action #10608

Merged
merged 9 commits into from
Jul 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -162,21 +163,64 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)

// This is done here so that we have a valid workflow to test the commands against
if tc.testCLIErrors {
t.Run("Client error handling", func(t *testing.T) {
_, output := performVDiff2Action(t, ksWorkflow, allCellNames, "badcmd", "", true)
require.Contains(t, output, "usage:")
_, output = performVDiff2Action(t, ksWorkflow, allCellNames, "create", "invalid_uuid", true)
require.Contains(t, output, "please provide a valid UUID")
_, output = performVDiff2Action(t, ksWorkflow, allCellNames, "resume", "invalid_uuid", true)
require.Contains(t, output, "can only resume a specific vdiff, please provide a valid UUID")
uuid, _ := performVDiff2Action(t, ksWorkflow, allCellNames, "show", "last", false)
_, output = performVDiff2Action(t, ksWorkflow, allCellNames, "create", uuid, true)
require.Contains(t, output, "already exists")
})
testCLIErrors(t, ksWorkflow, allCellNames)
}

testDelete(t, ksWorkflow, allCellNames)

// create another VDiff record to confirm it gets deleted when the workflow is completed
ts := time.Now()
uuid, _ := performVDiff2Action(t, ksWorkflow, allCellNames, "create", "", false)
waitForVDiff2ToComplete(t, ksWorkflow, allCellNames, uuid, ts)

err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "SwitchTraffic", ksWorkflow)
require.NoError(t, err)
err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "Complete", ksWorkflow)
require.NoError(t, err)

// confirm the VDiff data is deleted for the workflow
testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards)
}

func testCLIErrors(t *testing.T, ksWorkflow, cells string) {
t.Run("Client error handling", func(t *testing.T) {
_, output := performVDiff2Action(t, ksWorkflow, cells, "badcmd", "", true)
require.Contains(t, output, "usage:")
_, output = performVDiff2Action(t, ksWorkflow, cells, "create", "invalid_uuid", true)
require.Contains(t, output, "please provide a valid UUID")
_, output = performVDiff2Action(t, ksWorkflow, cells, "resume", "invalid_uuid", true)
require.Contains(t, output, "can only resume a specific vdiff, please provide a valid UUID")
_, output = performVDiff2Action(t, ksWorkflow, cells, "delete", "invalid_uuid", true)
require.Contains(t, output, "can only delete a specific vdiff, please provide a valid UUID")
uuid, _ := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false)
_, output = performVDiff2Action(t, ksWorkflow, cells, "create", uuid, true)
require.Contains(t, output, "already exists")
})
}

func testDelete(t *testing.T, ksWorkflow, cells string) {
t.Run("Delete", func(t *testing.T) {
// test show verbose too as a side effect
uuid, output := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false, "--verbose")
// only present with --verbose
require.Contains(t, output, `"TableSummary":`)
_, output = performVDiff2Action(t, ksWorkflow, cells, "delete", uuid, false)
require.Contains(t, output, `"Status": "completed"`)
_, output = performVDiff2Action(t, ksWorkflow, cells, "delete", "all", false)
require.Contains(t, output, `"Status": "completed"`)
_, output = performVDiff2Action(t, ksWorkflow, cells, "show", "all", false)
require.Equal(t, "[]\n", output)
})
}

func testNoOrphanedData(t *testing.T, keyspace, workflow string, shards []string) {
t.Run("No orphaned data", func(t *testing.T) {
query := fmt.Sprintf("select vd.id as vdiff_id, vdt.vdiff_id as vdiff_table_id, vdl.vdiff_id as vdiff_log_id from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) inner join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id) where vd.keyspace = %s and vd.workflow = %s",
encodeString(keyspace), encodeString(workflow))
for _, shard := range shards {
res, err := vc.getPrimaryTablet(t, keyspace, shard).QueryTablet(query, keyspace, false)
require.NoError(t, err)
require.Equal(t, 0, len(res.Rows))
}
})
}
22 changes: 18 additions & 4 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/buger/jsonparser"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/wrangler"
)
Expand Down Expand Up @@ -162,15 +163,22 @@ func vdiff2Resume(t *testing.T, keyspace, workflow, cells string, expectedRows i
require.Equal(t, expectedRows, info.RowsCompared)
}

func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool) (uuid string, output string) {
func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) {
var err error
output, err = vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "--", "--v2", "--tablet_types=primary", "--source_cell="+cells, "--format=json", ksWorkflow, action, actionArg)
if len(extraFlags) > 0 {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "--", "--v2", "--tablet_types=primary", "--source_cell="+cells, "--format=json",
strings.Join(extraFlags, " "), ksWorkflow, action, actionArg)
} else {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "--", "--v2", "--tablet_types=primary", "--source_cell="+cells, "--format=json", ksWorkflow, action, actionArg)
}
log.Infof("vdiff2 output: %+v (err: %+v)", output, err)
if !expectError {
require.Nil(t, err)
uuid, err = jsonparser.GetString([]byte(output), "UUID")
require.NoError(t, err)
require.NotEmpty(t, uuid)
if action != "delete" && !(action == "show" && actionArg == "all") { // a UUID is not required
require.NoError(t, err)
require.NotEmpty(t, uuid)
}
}
return uuid, output
}
Expand Down Expand Up @@ -198,3 +206,9 @@ func getVDiffInfo(jsonStr string) *vdiffInfo {

return &info
}

func encodeString(in string) string {
var buf strings.Builder
sqltypes.NewVarChar(in).EncodeSQL(&buf)
return buf.String()
}
50 changes: 40 additions & 10 deletions go/vt/vtctl/vdiff2.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl

debugQuery := subFlags.Bool("debug_query", false, "Adds a mysql query to the report that can be used for further debugging")
onlyPks := subFlags.Bool("only_pks", false, "When reporting missing rows, only show primary keys in the report.")
format := subFlags.String("format", "", "Format of report") // "json" or ""
var format string
subFlags.StringVar(&format, "format", "text", "Format of report") // "json" or "text"
rohit-nayak-ps marked this conversation as resolved.
Show resolved Hide resolved
maxExtraRowsToCompare := subFlags.Int64("max_extra_rows_to_compare", 1000, "If there are collation differences between the source and target, you can have rows that are identical but simply returned in a different order from MySQL. We will do a second pass to compare the rows for any actual differences in this case and this flag allows you to control the resources used for this operation.")

resumable := subFlags.Bool("resumable", false, "Should this vdiff retry in case of recoverable errors, not yet implemented")
checksum := subFlags.Bool("checksum", false, "Use row-level checksums to compare, not yet implemented")
samplePct := subFlags.Int64("sample_pct", 100, "How many rows to sample, not yet implemented")
verbose := subFlags.Bool("verbose", false, "Show verbose vdiff output in summaries")

if err := subFlags.Parse(args); err != nil {
return err
}
format = strings.ToLower(format)

var action vdiff.VDiffAction
var actionArg string

Expand Down Expand Up @@ -112,7 +116,7 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{
OnlyPKS: *onlyPks,
DebugQuery: *debugQuery,
Format: *format,
Format: format,
},
}

Expand Down Expand Up @@ -141,6 +145,15 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
if err != nil {
return fmt.Errorf("can only resume a specific vdiff, please provide a valid UUID; view all with: VDiff -- --v2 %s.%s show all", keyspace, workflowName)
}
case vdiff.DeleteAction:
switch actionArg {
case vdiff.AllActionArg:
default:
vdiffUUID, err = uuid.Parse(actionArg)
if err != nil {
return fmt.Errorf("can only delete a specific vdiff, please provide a valid UUID; view all with: VDiff -- --v2 %s.%s show all", keyspace, workflowName)
}
}
default:
return fmt.Errorf("invalid action %s; %s", action, usage)
}
Expand All @@ -155,15 +168,17 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl

switch action {
case vdiff.CreateAction, vdiff.ResumeAction:
displayVDiff2ScheduledResponse(wr, *format, vdiffUUID.String(), action)
displayVDiff2ScheduledResponse(wr, format, vdiffUUID.String(), action)
case vdiff.ShowAction:
if output == nil {
// should not happen
return fmt.Errorf("invalid (empty) response from show command")
}
if err := displayVDiff2ShowResponse(wr, *format, keyspace, workflowName, actionArg, output); err != nil {
if err := displayVDiff2ShowResponse(wr, format, keyspace, workflowName, actionArg, output, *verbose); err != nil {
return err
}
case vdiff.DeleteAction:
displayVDiff2ActionStatusResponse(wr, format, action, vdiff.CompletedState)
default:
return fmt.Errorf("invalid action %s; %s", action, usage)
}
Expand Down Expand Up @@ -264,7 +279,7 @@ func displayListings(listings []*VDiffListing) string {
return str
}

func displayVDiff2ShowResponse(wr *wrangler.Wrangler, format, keyspace, workflowName, actionArg string, output *wrangler.VDiffOutput) error {
func displayVDiff2ShowResponse(wr *wrangler.Wrangler, format, keyspace, workflowName, actionArg string, output *wrangler.VDiffOutput, verbose bool) error {
var vdiffUUID uuid.UUID
var err error
switch actionArg {
Expand Down Expand Up @@ -294,7 +309,7 @@ func displayVDiff2ShowResponse(wr *wrangler.Wrangler, format, keyspace, workflow
if len(output.Responses) == 0 {
return fmt.Errorf("no response received for vdiff show of %s.%s(%s)", keyspace, workflowName, vdiffUUID.String())
}
return displayVDiff2ShowSingleSummary(wr, format, keyspace, workflowName, vdiffUUID.String(), output)
return displayVDiff2ShowSingleSummary(wr, format, keyspace, workflowName, vdiffUUID.String(), output, verbose)
}
}

Expand Down Expand Up @@ -342,9 +357,9 @@ func buildVDiff2Recent(output *wrangler.VDiffOutput) ([]*VDiffListing, error) {
return listings, nil
}

func displayVDiff2ShowSingleSummary(wr *wrangler.Wrangler, format, keyspace, workflowName, uuid string, output *wrangler.VDiffOutput) error {
func displayVDiff2ShowSingleSummary(wr *wrangler.Wrangler, format, keyspace, workflowName, uuid string, output *wrangler.VDiffOutput, verbose bool) error {
str := ""
summary, err := buildVDiff2SingleSummary(wr, keyspace, workflowName, uuid, output)
summary, err := buildVDiff2SingleSummary(wr, keyspace, workflowName, uuid, output, verbose)
if err != nil {
return err
}
Expand Down Expand Up @@ -376,7 +391,7 @@ func displayVDiff2ShowSingleSummary(wr *wrangler.Wrangler, format, keyspace, wor
wr.Logger().Printf(str + "\n")
return nil
}
func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid string, output *wrangler.VDiffOutput) (*vdiffSummary, error) {
func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid string, output *wrangler.VDiffOutput, verbose bool) (*vdiffSummary, error) {
summary := &vdiffSummary{
Workflow: workflow,
Keyspace: keyspace,
Expand Down Expand Up @@ -494,7 +509,7 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
summary.Shards = strings.Join(shards, ",")
summary.TableSummaryMap = tableSummaryMap
summary.Reports = reports
if !summary.HasMismatch {
if !summary.HasMismatch && !verbose {
summary.Reports = nil
summary.TableSummaryMap = nil
}
Expand Down Expand Up @@ -524,3 +539,18 @@ func displayVDiff2ScheduledResponse(wr *wrangler.Wrangler, format string, uuid s
wr.Logger().Printf(msg)
}
}

func displayVDiff2ActionStatusResponse(wr *wrangler.Wrangler, format string, action vdiff.VDiffAction, status vdiff.VDiffState) {
if format == "json" {
type ActionStatusResponse struct {
Action vdiff.VDiffAction
Status vdiff.VDiffState
}
resp := &ActionStatusResponse{Action: action, Status: status}
jsonText, _ := json.MarshalIndent(resp, "", "\t")
wr.Logger().Printf(string(jsonText) + "\n")
} else {
msg := fmt.Sprintf("VDiff %s status is %s on target shards\n", action, status)
wr.Logger().Printf(msg)
}
}
18 changes: 17 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ const (
CreateAction VDiffAction = "create"
ShowAction VDiffAction = "show"
ResumeAction VDiffAction = "resume"
DeleteAction VDiffAction = "delete"
AllActionArg = "all"
LastActionArg = "last"
)

var (
Actions = []VDiffAction{CreateAction, ShowAction, ResumeAction}
Actions = []VDiffAction{CreateAction, ShowAction, ResumeAction, DeleteAction}
ActionArgs = []string{AllActionArg, LastActionArg}
)

Expand Down Expand Up @@ -165,6 +166,21 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat
return nil, fmt.Errorf("action argument %s not supported", req.SubCommand)
}
}
case DeleteAction:
query := ""
switch req.SubCommand {
case AllActionArg:
query = fmt.Sprintf(sqlDeleteVDiffs, encodeString(req.Keyspace), encodeString(req.Workflow))
default:
uuid, err := uuid.Parse(req.SubCommand)
if err != nil {
return nil, fmt.Errorf("action argument %s not supported", req.SubCommand)
}
query = fmt.Sprintf(sqlDeleteVDiffByUUID, encodeString(req.Keyspace), encodeString(req.Workflow), encodeString(uuid.String()))
}
if _, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("action %s not supported", action)
}
Expand Down
17 changes: 11 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,17 @@ const (
sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %s and workflow = %s and vdiff_uuid = %s"
sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit 1"
sqlGetVDiffByID = "select * from _vt.vdiff where id = %d"
sqlVDiffSummary = `select vd.state as vdiff_state, vdt.table_name as table_name,
vd.vdiff_uuid as 'uuid', vdt.state as table_state, vdt.table_rows as table_rows,
vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at,
IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report
from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vdt.vdiff_id = %d`
sqlDeleteVDiffs = `delete from vd, vdt, vdl using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
inner join _vt.vdiff_log as vdl on (vd.id = vdl.vdiff_id)
where vd.keyspace = %s and vd.workflow = %s`
sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
and vd.keyspace = %s and vd.workflow = %s and vd.vdiff_uuid = %s`
sqlVDiffSummary = `select vd.state as vdiff_state, vdt.table_name as table_name,
vd.vdiff_uuid as 'uuid', vdt.state as table_state, vdt.table_rows as table_rows,
vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at,
IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report
from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vdt.vdiff_id = %d`
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`
sqlUpdateVDiffState = "update _vt.vdiff set state = %s %s where id = %d"
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
Expand Down
Loading