Skip to content

Commit

Permalink
Implement VDiff2 delete command
Browse files Browse the repository at this point in the history
Also add --verbose flag for VDiff output

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 28, 2022
1 parent 05e88ca commit 4a25442
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 21 deletions.
9 changes: 9 additions & 0 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,15 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, cells []*Cell)
})
}

// test show verbose and delete
_, output := performVDiff2Action(t, ksWorkflow, allCellNames, "show", "last", false, "--verbose")
// only present with --verbose
require.Contains(t, output, `"TableSummary":`)
_, output = performVDiff2Action(t, ksWorkflow, allCellNames, "delete", "all", false)
require.Contains(t, output, `"Status": "completed"`)
_, output = performVDiff2Action(t, ksWorkflow, allCellNames, "show", "all", false)
require.Equal(t, "[]\n", output)

err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "SwitchTraffic", ksWorkflow)
require.NoError(t, err)
err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "Complete", ksWorkflow)
Expand Down
15 changes: 11 additions & 4 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,22 @@ func vdiff2Resume(t *testing.T, keyspace, workflow, cells string, expectedRows i
require.Equal(t, expectedRows, info.RowsCompared)
}

func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool) (uuid string, output string) {
func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) {
var err error
output, err = vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "--", "--v2", "--tablet_types=primary", "--source_cell="+cells, "--format=json", ksWorkflow, action, actionArg)
if len(extraFlags) > 0 {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "--", "--v2", "--tablet_types=primary", "--source_cell="+cells, "--format=json",
strings.Join(extraFlags, " "), ksWorkflow, action, actionArg)
} else {
output, err = vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "--", "--v2", "--tablet_types=primary", "--source_cell="+cells, "--format=json", ksWorkflow, action, actionArg)
}
log.Infof("vdiff2 output: %+v (err: %+v)", output, err)
if !expectError {
require.Nil(t, err)
uuid, err = jsonparser.GetString([]byte(output), "UUID")
require.NoError(t, err)
require.NotEmpty(t, uuid)
if action != "delete" && !(action == "show" && actionArg == "all") { // a UUID is not required
require.NoError(t, err)
require.NotEmpty(t, uuid)
}
}
return uuid, output
}
Expand Down
49 changes: 39 additions & 10 deletions go/vt/vtctl/vdiff2.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl

debugQuery := subFlags.Bool("debug_query", false, "Adds a mysql query to the report that can be used for further debugging")
onlyPks := subFlags.Bool("only_pks", false, "When reporting missing rows, only show primary keys in the report.")
format := subFlags.String("format", "", "Format of report") // "json" or ""
var format string
subFlags.StringVar(&format, "format", "text", "Format of report") // "json" or "text"
format = strings.ToLower(format)
maxExtraRowsToCompare := subFlags.Int64("max_extra_rows_to_compare", 1000, "If there are collation differences between the source and target, you can have rows that are identical but simply returned in a different order from MySQL. We will do a second pass to compare the rows for any actual differences in this case and this flag allows you to control the resources used for this operation.")

resumable := subFlags.Bool("resumable", false, "Should this vdiff retry in case of recoverable errors, not yet implemented")
checksum := subFlags.Bool("checksum", false, "Use row-level checksums to compare, not yet implemented")
samplePct := subFlags.Int64("sample_pct", 100, "How many rows to sample, not yet implemented")
verbose := subFlags.Bool("verbose", false, "Show full vdiff output in summaries (only applicable when using JSON format)")

if err := subFlags.Parse(args); err != nil {
return err
Expand Down Expand Up @@ -112,7 +115,7 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{
OnlyPKS: *onlyPks,
DebugQuery: *debugQuery,
Format: *format,
Format: format,
},
}

Expand Down Expand Up @@ -141,6 +144,15 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl
if err != nil {
return fmt.Errorf("can only resume a specific vdiff, please provide a valid UUID; view all with: VDiff -- --v2 %s.%s show all", keyspace, workflowName)
}
case vdiff.DeleteAction:
switch actionArg {
case vdiff.AllActionArg:
default:
vdiffUUID, err = uuid.Parse(actionArg)
if err != nil {
return fmt.Errorf("can only delete a specific vdiff, please provide a valid UUID; view all with: VDiff -- --v2 %s.%s show all", keyspace, workflowName)
}
}
default:
return fmt.Errorf("invalid action %s; %s", action, usage)
}
Expand All @@ -155,15 +167,17 @@ func commandVDiff2(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag.Fl

switch action {
case vdiff.CreateAction, vdiff.ResumeAction:
displayVDiff2ScheduledResponse(wr, *format, vdiffUUID.String(), action)
displayVDiff2ScheduledResponse(wr, format, vdiffUUID.String(), action)
case vdiff.ShowAction:
if output == nil {
// should not happen
return fmt.Errorf("invalid (empty) response from show command")
}
if err := displayVDiff2ShowResponse(wr, *format, keyspace, workflowName, actionArg, output); err != nil {
if err := displayVDiff2ShowResponse(wr, format, keyspace, workflowName, actionArg, output, *verbose); err != nil {
return err
}
case vdiff.DeleteAction:
displayVDiff2ActionStatusResponse(wr, format, action, vdiff.CompletedState)
default:
return fmt.Errorf("invalid action %s; %s", action, usage)
}
Expand Down Expand Up @@ -264,7 +278,7 @@ func displayListings(listings []*VDiffListing) string {
return str
}

func displayVDiff2ShowResponse(wr *wrangler.Wrangler, format, keyspace, workflowName, actionArg string, output *wrangler.VDiffOutput) error {
func displayVDiff2ShowResponse(wr *wrangler.Wrangler, format, keyspace, workflowName, actionArg string, output *wrangler.VDiffOutput, verbose bool) error {
var vdiffUUID uuid.UUID
var err error
switch actionArg {
Expand Down Expand Up @@ -294,7 +308,7 @@ func displayVDiff2ShowResponse(wr *wrangler.Wrangler, format, keyspace, workflow
if len(output.Responses) == 0 {
return fmt.Errorf("no response received for vdiff show of %s.%s(%s)", keyspace, workflowName, vdiffUUID.String())
}
return displayVDiff2ShowSingleSummary(wr, format, keyspace, workflowName, vdiffUUID.String(), output)
return displayVDiff2ShowSingleSummary(wr, format, keyspace, workflowName, vdiffUUID.String(), output, verbose)
}
}

Expand Down Expand Up @@ -342,9 +356,9 @@ func buildVDiff2Recent(output *wrangler.VDiffOutput) ([]*VDiffListing, error) {
return listings, nil
}

func displayVDiff2ShowSingleSummary(wr *wrangler.Wrangler, format, keyspace, workflowName, uuid string, output *wrangler.VDiffOutput) error {
func displayVDiff2ShowSingleSummary(wr *wrangler.Wrangler, format, keyspace, workflowName, uuid string, output *wrangler.VDiffOutput, verbose bool) error {
str := ""
summary, err := buildVDiff2SingleSummary(wr, keyspace, workflowName, uuid, output)
summary, err := buildVDiff2SingleSummary(wr, keyspace, workflowName, uuid, output, verbose)
if err != nil {
return err
}
Expand Down Expand Up @@ -376,7 +390,7 @@ func displayVDiff2ShowSingleSummary(wr *wrangler.Wrangler, format, keyspace, wor
wr.Logger().Printf(str + "\n")
return nil
}
func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid string, output *wrangler.VDiffOutput) (*vdiffSummary, error) {
func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid string, output *wrangler.VDiffOutput, verbose bool) (*vdiffSummary, error) {
summary := &vdiffSummary{
Workflow: workflow,
Keyspace: keyspace,
Expand Down Expand Up @@ -494,7 +508,7 @@ func buildVDiff2SingleSummary(wr *wrangler.Wrangler, keyspace, workflow, uuid st
summary.Shards = strings.Join(shards, ",")
summary.TableSummaryMap = tableSummaryMap
summary.Reports = reports
if !summary.HasMismatch {
if !summary.HasMismatch && !verbose {
summary.Reports = nil
summary.TableSummaryMap = nil
}
Expand Down Expand Up @@ -524,3 +538,18 @@ func displayVDiff2ScheduledResponse(wr *wrangler.Wrangler, format string, uuid s
wr.Logger().Printf(msg)
}
}

func displayVDiff2ActionStatusResponse(wr *wrangler.Wrangler, format string, action vdiff.VDiffAction, status vdiff.VDiffState) {
if format == "json" {
type ActionStatusResponse struct {
Action vdiff.VDiffAction
Status vdiff.VDiffState
}
resp := &ActionStatusResponse{Action: action, Status: status}
jsonText, _ := json.MarshalIndent(resp, "", "\t")
wr.Logger().Printf(string(jsonText) + "\n")
} else {
msg := fmt.Sprintf("VDiff %s is %s on target shards\n", action, status)
wr.Logger().Printf(msg)
}
}
18 changes: 17 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ const (
CreateAction VDiffAction = "create"
ShowAction VDiffAction = "show"
ResumeAction VDiffAction = "resume"
DeleteAction VDiffAction = "delete"
AllActionArg = "all"
LastActionArg = "last"
)

var (
Actions = []VDiffAction{CreateAction, ShowAction, ResumeAction}
Actions = []VDiffAction{CreateAction, ShowAction, ResumeAction, DeleteAction}
ActionArgs = []string{AllActionArg, LastActionArg}
)

Expand Down Expand Up @@ -165,6 +166,21 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat
return nil, fmt.Errorf("action argument %s not supported", req.SubCommand)
}
}
case DeleteAction:
query := ""
switch req.SubCommand {
case AllActionArg:
query = fmt.Sprintf(sqlDeleteVDiffs, encodeString(req.Keyspace), encodeString(req.Workflow))
default:
uuid, err := uuid.Parse(req.SubCommand)
if err != nil {
return nil, fmt.Errorf("action argument %s not supported", req.SubCommand)
}
query = fmt.Sprintf(sqlDeleteVDiffByUUID, encodeString(req.Keyspace), encodeString(req.Workflow), encodeString(uuid.String()))
}
if _, err = withDDL.Exec(context.Background(), query, dbClient.ExecuteFetch, dbClient.ExecuteFetch); err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("action %s not supported", action)
}
Expand Down
17 changes: 11 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,17 @@ const (
sqlGetVDiffByKeyspaceWorkflowUUID = "select * from _vt.vdiff where keyspace = %s and workflow = %s and vdiff_uuid = %s"
sqlGetMostRecentVDiff = "select * from _vt.vdiff where keyspace = %s and workflow = %s order by id desc limit 1"
sqlGetVDiffByID = "select * from _vt.vdiff where id = %d"
sqlVDiffSummary = `select vd.state as vdiff_state, vdt.table_name as table_name,
vd.vdiff_uuid as 'uuid', vdt.state as table_state, vdt.table_rows as table_rows,
vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at,
IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report
from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vdt.vdiff_id = %d`
// sqlDeleteVDiffs has a placeholder for any query predicates -- deleting all VDiffs by default
sqlDeleteVDiffs = `delete from vd, vdt using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) where
vd.keyspace = %s and vd.workflow = %s`
sqlDeleteVDiffByUUID = `delete from vd, vdt using _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
and vd.keyspace = %s and vd.workflow = %s and vd.vdiff_uuid = %s`
sqlVDiffSummary = `select vd.state as vdiff_state, vdt.table_name as table_name,
vd.vdiff_uuid as 'uuid', vdt.state as table_state, vdt.table_rows as table_rows,
vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at,
IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report
from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vdt.vdiff_id = %d`
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`
sqlUpdateVDiffState = "update _vt.vdiff set state = %s %s where id = %d"
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
Expand Down

0 comments on commit 4a25442

Please sign in to comment.