Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication Workflow Create: Check for Copying state while waiting for streams to start #9206

Merged
merged 2 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestMigrate(t *testing.T) {
"-source=ext1.rating", "create", ksWorkflow); err != nil {
t.Fatalf("Migrate command failed with %+v : %s\n", err, output)
}
time.Sleep(1 * time.Second) // wait for migrate to run
expectNumberOfStreams(t, vtgateConn, "migrate", "e1", "product:0", 1)
validateCount(t, vtgateConn, "product:0", "rating", 2)
validateCount(t, vtgateConn, "product:0", "review", 3)
Expand Down
10 changes: 5 additions & 5 deletions go/vt/vtctl/vtctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2540,7 +2540,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
wr.Logger().Printf("Waiting for workflow to start:\n")

type streamCount struct {
total, running int64
total, started int64
}
errCh := make(chan error)
wfErrCh := make(chan []*wrangler.WorkflowError)
Expand All @@ -2557,7 +2557,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
errCh <- fmt.Errorf("workflow did not start within %s", (*timeout).String())
return
case <-ticker.C:
totalStreams, runningStreams, workflowErrors, err := wf.GetStreamCount()
totalStreams, startedStreams, workflowErrors, err := wf.GetStreamCount()
if err != nil {
errCh <- err
close(errCh)
Expand All @@ -2568,7 +2568,7 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
}
progressCh <- &streamCount{
total: totalStreams,
running: runningStreams,
started: startedStreams,
}
}
}
Expand All @@ -2577,12 +2577,12 @@ func commandVRWorkflow(ctx context.Context, wr *wrangler.Wrangler, subFlags *fla
for {
select {
case progress := <-progressCh:
if progress.running == progress.total {
if progress.started == progress.total {
wr.Logger().Printf("\nWorkflow started successfully with %d stream(s)\n", progress.total)
printDetails()
return nil
}
wr.Logger().Printf("%d%% ... ", 100*progress.running/progress.total)
wr.Logger().Printf("%d%% ... ", 100*progress.started/progress.total)
case err := <-errCh:
wr.Logger().Error(err)
cancelTimedCtx()
Expand Down
12 changes: 6 additions & 6 deletions go/vt/wrangler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,33 +210,33 @@ func NewWorkflowError(tablet string, id int64, description string) *WorkflowErro
return wfErr
}

// GetStreamCount returns a count of total and running streams and any stream errors
// GetStreamCount returns a count of total streams and of streams that have started processing
func (vrw *VReplicationWorkflow) GetStreamCount() (int64, int64, []*WorkflowError, error) {
var err error
var workflowErrors []*WorkflowError
var totalStreams, runningStreams int64
var total, started int64
res, err := vrw.wr.ShowWorkflow(vrw.ctx, vrw.params.Workflow, vrw.params.TargetKeyspace)
if err != nil {
return 0, 0, nil, err
}
for ksShard := range res.ShardStatuses {
statuses := res.ShardStatuses[ksShard].PrimaryReplicationStatuses
for _, st := range statuses {
totalStreams++
total++
if strings.HasPrefix(st.Message, "Error:") {
workflowErrors = append(workflowErrors, NewWorkflowError(st.Tablet, st.ID, st.Message))
continue
}
if st.Pos == "" {
continue
}
if st.State == "Running" {
runningStreams++
if st.State == "Running" || st.State == "Copying" {
started++
}
}
}

return totalStreams, runningStreams, workflowErrors, nil
return total, started, workflowErrors, nil
}

// SwitchTraffic switches traffic in the direction passed for specified tablet_types
Expand Down