Skip to content

Commit

Permalink
workflow: Modify the unit test for resharding workflow. Control the
Browse files Browse the repository at this point in the history
workflow through manager.
  • Loading branch information
wangyipei01 committed Feb 25, 2017
1 parent 4236de7 commit 1ecbdc3
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 124 deletions.
4 changes: 0 additions & 4 deletions go/vt/vtctld/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtctl"
"github.com/youtube/vitess/go/vt/workflow"
"github.com/youtube/vitess/go/vt/workflow/resharding"
"github.com/youtube/vitess/go/vt/workflow/topovalidator"
)

Expand All @@ -40,9 +39,6 @@ func initWorkflowManager(ts topo.Server) {
// Register the Schema Swap workflow.
schemaswap.RegisterWorkflowFactory()

// Register the Horizontal Resharding workflow.
resharding.Register()

// Unregister the blacklisted workflows.
for _, name := range workflowManagerDisable {
workflow.Unregister(name)
Expand Down
11 changes: 11 additions & 0 deletions go/vt/workflow/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,17 @@ func (m *Manager) Wait(ctx context.Context, uuid string) error {
return nil
}

// GetWorkflowForTesting returns the workflow within the runningworkflow
// by uuid. The method is used when injecting a mock interface into a manager
// created workflow in unit test.
func (m *Manager) GetWorkflowForTesting(uuid string) (Workflow, error) {
rw, err := m.getRunningWorkflow(uuid)
if err != nil {
return nil, err
}
return rw.workflow, nil
}

// getRunningWorkflow returns a runningWorkflow by uuid.
func (m *Manager) getRunningWorkflow(uuid string) (*runningWorkflow, error) {
m.mu.Lock()
Expand Down
8 changes: 5 additions & 3 deletions go/vt/workflow/resharding/horizontal_resharding_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ const (
phaseMigrateMaster PhaseType = "migrate_master"
)

// Register registers the HorizontalReshardingWorkflowFactory as a factory
// init registers the HorizontalReshardingWorkflowFactory as a factory
// in the workflow framework.
func Register() {
func init() {
workflow.Register(horizontalReshardingFactoryName, &HorizontalReshardingWorkflowFactory{})
}

Expand Down Expand Up @@ -315,7 +315,9 @@ func (hw *HorizontalReshardingWorkflow) Run(ctx context.Context, manager *workfl
hw.ctx = ctx
hw.topoServer = manager.TopoServer()
hw.manager = manager
hw.wr = wrangler.New(logutil.NewConsoleLogger(), manager.TopoServer(), tmclient.NewTabletManagerClient())
if hw.wr == nil {
hw.wr = wrangler.New(logutil.NewConsoleLogger(), manager.TopoServer(), tmclient.NewTabletManagerClient())
}
hw.wi = wi
hw.checkpointWriter = NewCheckpointWriter(hw.topoServer, hw.checkpoint, hw.wi)

Expand Down
223 changes: 106 additions & 117 deletions go/vt/workflow/resharding/horizontal_resharding_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,174 +3,163 @@ package resharding
import (
"context"
"flag"
"fmt"
"testing"

"github.com/golang/mock/gomock"
"github.com/youtube/vitess/go/vt/logutil"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/topo/memorytopo"
"github.com/youtube/vitess/go/vt/worker/fakevtworkerclient"
"github.com/youtube/vitess/go/vt/worker/vtworkerclient"
"github.com/youtube/vitess/go/vt/workflow"
"github.com/youtube/vitess/go/vt/wrangler"

topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
workflowpb "github.com/youtube/vitess/go/vt/proto/workflow"
)

var (
testKeyspace = "test_keyspace"
testVtworkers = "localhost:15032"
)

// TestHorizontalResharding runs the happy path of HorizontalReshardingWorkflow.
func TestHorizontalResharding(t *testing.T) {
// Set up the mock wrangler. It is used for the CopySchema and Migrate phase.
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()
mockWranglerInterface := setupMockWrangler(ctx, ctrl)

// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker()
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")

// Create a checkpoint with initialized tasks.
sourceShards := []string{"0"}
destinationShards := []string{"-80", "80-"}
vtworkers := []string{"localhost:15032"}
checkpoint, err := initCheckpointFromShards("test_keyspace", vtworkers, sourceShards, destinationShards)
if err != nil {
t.Errorf("initialize checkpoint fails: %v", err)
}

hw, err := createWorkflow(ctx, mockWranglerInterface, checkpoint)
if err != nil {
t.Errorf("initialize Workflow fails: %v", err)
}
if err := hw.runWorkflow(); err != nil {
t.Errorf("%s: Horizontal resharding workflow should not fail", err)
}

verifySuccess(t, hw.checkpoint)
}

// TestHorizontalReshardingRetry retries a stopped workflow,
// which the tasks are partially finished.
func TestHorizontalReshardingRetry(t *testing.T) {
// Set up mock wrangler. It is used for the CopySchema and Migrate phase.
// Set up the mock wrangler. It is used for the CopySchema,
// WaitforFilteredReplication and Migrate phase.
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := context.Background()
mockWranglerInterface := setupMockWranglerForRetry(ctx, ctrl)
mockWranglerInterface := setupMockWrangler(ctx, ctrl, testKeyspace)

// Set up fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker()
// Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase.
fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers)
vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory)
defer vtworkerclient.UnregisterFactoryForTest("fake")

// Create a checkpoint for the stopped workflow. For the stopped workflow,
// the task of copying schema to shard 80- succeed while the task of copying
// schema to shard -80 failed. The rest of tasks haven't been executed.
sourceShards := []string{"0"}
destinationShards := []string{"-80", "80-"}
vtworkers := []string{"localhost:15032"}
checkpoint, err := initCheckpointFromShards("test_keyspace", vtworkers, sourceShards, destinationShards)
// Initialize the topology.
ts := setupTopology(ctx, t, testKeyspace)
m := workflow.NewManager(ts)
// Run the manager in the background.
wg, cancel, _ := startManager(t, m)
// Create the workflow.
uuid, err := m.Create(context.Background(), horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + testVtworkers})
if err != nil {
t.Errorf("initialize checkpoint fails: %v", err)
t.Fatalf("cannot create testworkflow: %v", err)
}
setTaskSuccessOrFailure(checkpoint, createTaskID(phaseCopySchema, "80-"), true /* isSuccess*/)
setTaskSuccessOrFailure(checkpoint, createTaskID(phaseCopySchema, "-80"), false /* isSuccess*/)

hw, err := createWorkflow(ctx, mockWranglerInterface, checkpoint)
// Inject the mock wranger into the workflow.
w, err := m.GetWorkflowForTesting(uuid)
if err != nil {
t.Errorf("initialize Workflow fails: %v", err)
}
// Rerunning the workflow.
if err := hw.runWorkflow(); err != nil {
t.Errorf("%s: Horizontal resharding workflow should not fail", err)
t.Errorf("fail to get workflow from manager: %v", err)
}
hw := w.(*HorizontalReshardingWorkflow)
hw.wr = mockWranglerInterface

verifySuccess(t, hw.checkpoint)
}

func setTaskSuccessOrFailure(checkpoint *workflowpb.WorkflowCheckpoint, taskID string, isSuccess bool) {
t := checkpoint.Tasks[taskID]
t.State = workflowpb.TaskState_TaskDone
if !isSuccess {
t.Error = "failed"
} else {
t.Error = ""
// Start the job.
if err := m.Start(context.Background(), uuid); err != nil {
t.Fatalf("cannot start testworkflow: %v", err)
}
}

func createWorkflow(ctx context.Context, mockWranglerInterface *MockReshardingWrangler, checkpoint *workflowpb.WorkflowCheckpoint) (*HorizontalReshardingWorkflow, error) {
ts := memorytopo.NewServer("cell")
w := &workflowpb.Workflow{
Uuid: "test_hw",
FactoryName: horizontalReshardingFactoryName,
State: workflowpb.WorkflowState_NotStarted,
}
wi, err := ts.CreateWorkflow(ctx, w)
if err != nil {
return nil, err
}
hw := &HorizontalReshardingWorkflow{
ctx: ctx,
wr: mockWranglerInterface,
manager: workflow.NewManager(ts),
wi: wi,
topoServer: ts,
logger: logutil.NewMemoryLogger(),
checkpoint: checkpoint,
checkpointWriter: NewCheckpointWriter(ts, checkpoint, wi),
// Wait for the workflow to end.
m.Wait(context.Background(), uuid)
verifyWorkflowSuccess(context.Background(), t, ts, uuid) // TODO: implemented in parallel_runner_test.

// Stop the manager.
if err := m.Stop(context.Background(), uuid); err != nil {
t.Fatalf("cannot stop testworkflow: %v", err)
}
return hw, nil
cancel()
wg.Wait()
}

func setupFakeVtworker() *fakevtworkerclient.FakeVtworkerClient {
func setupFakeVtworker(keyspace, vtworkers string) *fakevtworkerclient.FakeVtworkerClient {
flag.Set("vtworker_client_protocol", "fake")
fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient()
fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitClone", "--min_healthy_rdonly_tablets=1", "test_keyspace/0"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "test_keyspace/-80"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "test_keyspace/80-"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitClone", "--min_healthy_rdonly_tablets=1", keyspace + "/0"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", keyspace + "/-80"}, "", nil)
fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", keyspace + "/80-"}, "", nil)
return fakeVtworkerClient
}

func setupMockWranglerForRetry(ctx context.Context, ctrl *gomock.Controller) *MockReshardingWrangler {
mockWranglerInterface := NewMockReshardingWrangler(ctrl)
// Set the expected behaviors for mock wrangler. copy schema to shard 80-
// should not be called.
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, "test_keyspace", "0", "test_keyspace", "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
func setupMockWrangler(ctx context.Context, ctrl *gomock.Controller, keyspace string) *MockReshardingWrangler {
// It is hard to set the expect call using the workflow's context since
// it is only assigned when calling workflow.Run. Therefore, we only check
// the type of the context argument in expect calls.
managerCtx, mCancel := context.WithCancel(ctx)
workflowCtx, wCancel := context.WithCancel(managerCtx)
defer wCancel()
defer mCancel()

servedTypeParams := []topodatapb.TabletType{topodatapb.TabletType_RDONLY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_MASTER}
for _, servedType := range servedTypeParams {
mockWranglerInterface.EXPECT().MigrateServedTypes(ctx, "test_keyspace", "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime).Return(nil)
}
return mockWranglerInterface
}
ctxMatcher := typeMatcher{workflowCtx}

func setupMockWrangler(ctx context.Context, ctrl *gomock.Controller) *MockReshardingWrangler {
mockWranglerInterface := NewMockReshardingWrangler(ctrl)
// Set the expected behaviors for mock wrangler.
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, "test_keyspace", "0", "test_keyspace", "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, "test_keyspace", "0", "test_keyspace", "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctxMatcher, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil)
mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctxMatcher, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil)

mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctxMatcher, keyspace, "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)
mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctxMatcher, keyspace, "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil)

servedTypeParams := []topodatapb.TabletType{topodatapb.TabletType_RDONLY,
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_MASTER}
for _, servedType := range servedTypeParams {
mockWranglerInterface.EXPECT().MigrateServedTypes(ctx, "test_keyspace", "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime).Return(nil)
mockWranglerInterface.EXPECT().MigrateServedTypes(ctxMatcher, keyspace, "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime).Return(nil)
}
return mockWranglerInterface
}

func verifySuccess(t *testing.T, checkpoint *workflowpb.WorkflowCheckpoint) {
for _, task := range checkpoint.Tasks {
if task.State != workflowpb.TaskState_TaskDone || task.Error != "" {
t.Fatalf("task: %v should succeed: task status: %v, %v", task.Id, task.State, task.Error)
func setupTopology(ctx context.Context, t *testing.T, keyspace string) topo.Server {
ts := memorytopo.NewServer("cell")
if err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}); err != nil {
t.Errorf("CreateKeyspace: %v", err)
}
createShard(ctx, t, ts.Impl, keyspace, "0", true)
createShard(ctx, t, ts.Impl, keyspace, "-80", false)
createShard(ctx, t, ts.Impl, keyspace, "80-", false)
return ts
}

func createShard(ctx context.Context, t *testing.T, ts topo.Impl, keyspace, name string, isServing bool) {
shard := &topodatapb.Shard{
KeyRange: newKeyRange(name),
}
if isServing {
servedTypes := map[topodatapb.TabletType]bool{
topodatapb.TabletType_MASTER: true,
topodatapb.TabletType_REPLICA: true,
topodatapb.TabletType_RDONLY: true,
}
for st := range servedTypes {
shard.ServedTypes = append(shard.ServedTypes, &topodatapb.Shard_ServedType{
TabletType: st,
})
}
}

if err := ts.CreateShard(ctx, keyspace, name, shard); err != nil {
t.Fatalf("CreateShard fails: %v", err)
}
}

func newKeyRange(value string) *topodatapb.KeyRange {
_, result, err := topo.ValidateShardName(value)
if err != nil {
panic(fmt.Sprintf("BUG: invalid shard name to initialize key range: %v", err))
}
return result
}

// typeMatcher implements the interface gomock.Matcher.
type typeMatcher struct {
x interface{}
}

// Matches check whether the type of the variables are the same.
func (t typeMatcher) Matches(x interface{}) bool {
return fmt.Sprintf("%T", t.x) == fmt.Sprintf("%T", x)
}

func (t typeMatcher) String() string {
return fmt.Sprintf("has the same type with %v", t.x)
}

0 comments on commit 1ecbdc3

Please sign in to comment.