diff --git a/go/vt/vtctl/vtctl.go b/go/vt/vtctl/vtctl.go index 0be416ff661..0b970412071 100644 --- a/go/vt/vtctl/vtctl.go +++ b/go/vt/vtctl/vtctl.go @@ -478,7 +478,7 @@ var commands = []commandGroup{ "Workflow", []command{ {"Workflow", commandWorkflow, " --dry-run", - "Start/Stop/Delete Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start", + "Start/Stop/Delete/List/ListAll Workflow on all target tablets in workflow. Example: Workflow merchant.morders Start", }, }, }, @@ -2946,23 +2946,28 @@ func commandWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *flag. return err } if subFlags.NArg() != 2 { - return fmt.Errorf("usage: Workflow --dry-run keyspace.workflow start/stop/delete") + return fmt.Errorf("usage: Workflow --dry-run keyspace.workflow start/stop/delete/list/list-all") } - keyspace, workflow, err := splitKeyspaceWorkflow(subFlags.Arg(0)) - if err != nil { - return err + keyspace := subFlags.Arg(0) + action := strings.ToLower(subFlags.Arg(1)) + var workflow string + var err error + if action != "listall" { + keyspace, workflow, err = splitKeyspaceWorkflow(subFlags.Arg(0)) + if err != nil { + return err + } } _, err = wr.TopoServer().GetKeyspace(ctx, keyspace) if err != nil { wr.Logger().Errorf("Keyspace %s not found", keyspace) } - action := subFlags.Arg(1) results, err := wr.WorkflowAction(ctx, workflow, keyspace, action, *dryRun) if err != nil { return err } - if action == "list" { + if action == "list" || action == "listall" { return nil } if len(results) == 0 { diff --git a/go/vt/wrangler/vexec.go b/go/vt/wrangler/vexec.go index e978811e63d..a2bbbdba919 100644 --- a/go/vt/wrangler/vexec.go +++ b/go/vt/wrangler/vexec.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/util/sets" + "vitess.io/vitess/go/vt/log" "github.com/gogo/protobuf/proto" @@ -188,10 +190,13 @@ func (vx *vexec) getMasterForShard(shard string) (*topo.TabletInfo, error) { return master, nil } -// WorkflowAction can start/stop/delete or list strams in _vt.vreplication on all masters in the target keyspace of the workflow +// WorkflowAction can start/stop/delete or list streams in _vt.vreplication on all masters in the target keyspace of the workflow. func (wr *Wrangler) WorkflowAction(ctx context.Context, workflow, keyspace, action string, dryRun bool) (map[*topo.TabletInfo]*sqltypes.Result, error) { if action == "list" { return nil, wr.listStreams(ctx, workflow, keyspace) + } else if action == "listall" { + _, err := wr.ListAllWorkflows(ctx, keyspace) + return nil, err } results, err := wr.execWorkflowAction(ctx, workflow, keyspace, action, dryRun) retResults := make(map[*topo.TabletInfo]*sqltypes.Result) @@ -344,6 +349,31 @@ func (wr *Wrangler) getStreams(ctx context.Context, workflow, keyspace string) ( return &rsr, nil } +// ListAllWorkflows will return a list of all active workflows for the given keyspace. +func (wr *Wrangler) ListAllWorkflows(ctx context.Context, keyspace string) ([]string, error) { + query := "select distinct workflow from _vt.vreplication where state <> 'Stopped'" + results, err := wr.runVexec(ctx, "", keyspace, query, false) + if err != nil { + return nil, err + } + workflowsSet := sets.NewString() + for _, result := range results { + if len(result.Rows) == 0 { + continue + } + qr := sqltypes.Proto3ToResult(result) + for _, row := range qr.Rows { + for _, value := range row { + // Even though we query for distinct, we must de-dup because we query per master tablet. + workflowsSet.Insert(value.ToString()) + } + } + } + workflows := workflowsSet.List() + wr.printWorkflowList(workflows) + return workflows, nil +} + func (wr *Wrangler) listStreams(ctx context.Context, workflow, keyspace string) error { replStatus, err := wr.getStreams(ctx, workflow, keyspace) if err != nil { @@ -376,6 +406,11 @@ func dumpStreamListAsJSON(replStatus *replicationStatusResult, wr *Wrangler) err return nil } +func (wr *Wrangler) printWorkflowList(workflows []string) { + list := strings.Join(workflows, ", ") + wr.Logger().Printf("Workflows: %v", list) +} + func (wr *Wrangler) getCopyState(ctx context.Context, tablet *topo.TabletInfo, id int64) ([]copyState, error) { var cs []copyState query := fmt.Sprintf(`select table_name, lastpk from _vt.copy_state where vrepl_id = %d`, id) diff --git a/go/vt/wrangler/vexec_plan.go b/go/vt/wrangler/vexec_plan.go index a61a986d679..6499d71ebdf 100644 --- a/go/vt/wrangler/vexec_plan.go +++ b/go/vt/wrangler/vexec_plan.go @@ -118,7 +118,7 @@ func (vx *vexec) addDefaultWheres(where *sqlparser.Where) *sqlparser.Where { } } } - if !hasWorkflow { + if !hasWorkflow && vx.workflow != "" { expr := &sqlparser.ComparisonExpr{ Left: &sqlparser.ColName{Name: sqlparser.NewColIdent("workflow")}, Operator: sqlparser.EqualStr, diff --git a/go/vt/wrangler/vexec_test.go b/go/vt/wrangler/vexec_test.go index f0a15e0841f..754bb2f0418 100644 --- a/go/vt/wrangler/vexec_test.go +++ b/go/vt/wrangler/vexec_test.go @@ -292,6 +292,20 @@ will be run on the following streams in keyspace target for workflow wrWorkflow: require.Equal(t, dryRunResult, logger.String()) } +func TestWorkflowListAll(t *testing.T) { + ctx := context.Background() + keyspace := "target" + workflow := "wrWorkflow" + env := newWranglerTestEnv([]string{"0"}, []string{"-80", "80-"}, "", nil) + defer env.close() + logger := logutil.NewMemoryLogger() + wr := New(logger, env.topoServ, env.tmc) + + workflows, err := wr.ListAllWorkflows(ctx, keyspace) + require.Nil(t, err) + require.Equal(t, []string{workflow}, workflows) +} + func TestVExecValidations(t *testing.T) { ctx := context.Background() workflow := "wf" diff --git a/go/vt/wrangler/wrangler_env_test.go b/go/vt/wrangler/wrangler_env_test.go index fd2c9af4c3f..92cd904e88b 100644 --- a/go/vt/wrangler/wrangler_env_test.go +++ b/go/vt/wrangler/wrangler_env_test.go @@ -149,7 +149,6 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit fmt.Sprintf("1|%v|pos||0|Running|vt_target|1234|", bls), ) env.tmc.setVRResults(master.tablet, "select id, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, message from _vt.vreplication where db_name = 'vt_target' and workflow = 'wrWorkflow'", result) - //" env.tmc.setVRResults( master.tablet, "select source, pos from _vt.vreplication where db_name='vt_target' and workflow='wrWorkflow'", @@ -159,6 +158,12 @@ func newWranglerTestEnv(sourceShards, targetShards []string, query string, posit posRows..., ), ) + result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "workflow", + "varchar"), + "wrWorkflow", + ) + env.tmc.setVRResults(master.tablet, "select distinct workflow from _vt.vreplication where state != 'Stopped' and db_name = 'vt_target'", result) result = sqltypes.MakeTestResult(sqltypes.MakeTestFields( "table|lastpk",