From 1ecbdc32e34078365b72386c844d6d74c95b793b Mon Sep 17 00:00:00 2001 From: Yipei Wang Date: Fri, 24 Feb 2017 17:04:14 -0800 Subject: [PATCH] workflow: Modify the unit test for resharding workflow. Control the workflow through manager. --- go/vt/vtctld/workflow.go | 4 - go/vt/workflow/manager.go | 11 + .../horizontal_resharding_workflow.go | 8 +- .../horizontal_resharding_workflow_test.go | 223 +++++++++--------- 4 files changed, 122 insertions(+), 124 deletions(-) diff --git a/go/vt/vtctld/workflow.go b/go/vt/vtctld/workflow.go index d74921b1626..ea6b36f4848 100644 --- a/go/vt/vtctld/workflow.go +++ b/go/vt/vtctld/workflow.go @@ -13,7 +13,6 @@ import ( "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/vtctl" "github.com/youtube/vitess/go/vt/workflow" - "github.com/youtube/vitess/go/vt/workflow/resharding" "github.com/youtube/vitess/go/vt/workflow/topovalidator" ) @@ -40,9 +39,6 @@ func initWorkflowManager(ts topo.Server) { // Register the Schema Swap workflow. schemaswap.RegisterWorkflowFactory() - // Register the Horizontal Resharding workflow. - resharding.Register() - // Unregister the blacklisted workflows. for _, name := range workflowManagerDisable { workflow.Unregister(name) diff --git a/go/vt/workflow/manager.go b/go/vt/workflow/manager.go index c366270444b..04f78873a50 100644 --- a/go/vt/workflow/manager.go +++ b/go/vt/workflow/manager.go @@ -459,6 +459,17 @@ func (m *Manager) Wait(ctx context.Context, uuid string) error { return nil } +// GetWorkflowForTesting returns the workflow within the runningworkflow +// by uuid. The method is used when injecting a mock interface into a manager +// created workflow in unit test. +func (m *Manager) GetWorkflowForTesting(uuid string) (Workflow, error) { + rw, err := m.getRunningWorkflow(uuid) + if err != nil { + return nil, err + } + return rw.workflow, nil +} + // getRunningWorkflow returns a runningWorkflow by uuid. func (m *Manager) getRunningWorkflow(uuid string) (*runningWorkflow, error) { m.mu.Lock() diff --git a/go/vt/workflow/resharding/horizontal_resharding_workflow.go b/go/vt/workflow/resharding/horizontal_resharding_workflow.go index ca397240845..34224bb9893 100644 --- a/go/vt/workflow/resharding/horizontal_resharding_workflow.go +++ b/go/vt/workflow/resharding/horizontal_resharding_workflow.go @@ -44,9 +44,9 @@ const ( phaseMigrateMaster PhaseType = "migrate_master" ) -// Register registers the HorizontalReshardingWorkflowFactory as a factory +// init registers the HorizontalReshardingWorkflowFactory as a factory // in the workflow framework. -func Register() { +func init() { workflow.Register(horizontalReshardingFactoryName, &HorizontalReshardingWorkflowFactory{}) } @@ -315,7 +315,9 @@ func (hw *HorizontalReshardingWorkflow) Run(ctx context.Context, manager *workfl hw.ctx = ctx hw.topoServer = manager.TopoServer() hw.manager = manager - hw.wr = wrangler.New(logutil.NewConsoleLogger(), manager.TopoServer(), tmclient.NewTabletManagerClient()) + if hw.wr == nil { + hw.wr = wrangler.New(logutil.NewConsoleLogger(), manager.TopoServer(), tmclient.NewTabletManagerClient()) + } hw.wi = wi hw.checkpointWriter = NewCheckpointWriter(hw.topoServer, hw.checkpoint, hw.wi) diff --git a/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go b/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go index 1d9c151eebb..d5c532a4068 100644 --- a/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go +++ b/go/vt/workflow/resharding/horizontal_resharding_workflow_test.go @@ -3,10 +3,11 @@ package resharding import ( "context" "flag" + "fmt" "testing" "github.com/golang/mock/gomock" - "github.com/youtube/vitess/go/vt/logutil" + "github.com/youtube/vitess/go/vt/topo" "github.com/youtube/vitess/go/vt/topo/memorytopo" "github.com/youtube/vitess/go/vt/worker/fakevtworkerclient" "github.com/youtube/vitess/go/vt/worker/vtworkerclient" @@ -14,163 +15,151 @@ import ( "github.com/youtube/vitess/go/vt/wrangler" topodatapb "github.com/youtube/vitess/go/vt/proto/topodata" - workflowpb "github.com/youtube/vitess/go/vt/proto/workflow" +) + +var ( + testKeyspace = "test_keyspace" + testVtworkers = "localhost:15032" ) // TestHorizontalResharding runs the happy path of HorizontalReshardingWorkflow. func TestHorizontalResharding(t *testing.T) { - // Set up the mock wrangler. It is used for the CopySchema and Migrate phase. - ctrl := gomock.NewController(t) - defer ctrl.Finish() ctx := context.Background() - mockWranglerInterface := setupMockWrangler(ctx, ctrl) - - // Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase. - fakeVtworkerClient := setupFakeVtworker() - vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory) - defer vtworkerclient.UnregisterFactoryForTest("fake") - - // Create a checkpoint with initialized tasks. - sourceShards := []string{"0"} - destinationShards := []string{"-80", "80-"} - vtworkers := []string{"localhost:15032"} - checkpoint, err := initCheckpointFromShards("test_keyspace", vtworkers, sourceShards, destinationShards) - if err != nil { - t.Errorf("initialize checkpoint fails: %v", err) - } - - hw, err := createWorkflow(ctx, mockWranglerInterface, checkpoint) - if err != nil { - t.Errorf("initialize Workflow fails: %v", err) - } - if err := hw.runWorkflow(); err != nil { - t.Errorf("%s: Horizontal resharding workflow should not fail", err) - } - - verifySuccess(t, hw.checkpoint) -} -// TestHorizontalReshardingRetry retries a stopped workflow, -// which the tasks are partially finished. -func TestHorizontalReshardingRetry(t *testing.T) { - // Set up mock wrangler. It is used for the CopySchema and Migrate phase. + // Set up the mock wrangler. It is used for the CopySchema, + // WaitforFilteredReplication and Migrate phase. ctrl := gomock.NewController(t) defer ctrl.Finish() - ctx := context.Background() - mockWranglerInterface := setupMockWranglerForRetry(ctx, ctrl) + mockWranglerInterface := setupMockWrangler(ctx, ctrl, testKeyspace) - // Set up fakeworkerclient. It is used at SplitClone and SplitDiff phase. - fakeVtworkerClient := setupFakeVtworker() + // Set up the fakeworkerclient. It is used at SplitClone and SplitDiff phase. + fakeVtworkerClient := setupFakeVtworker(testKeyspace, testVtworkers) vtworkerclient.RegisterFactory("fake", fakeVtworkerClient.FakeVtworkerClientFactory) defer vtworkerclient.UnregisterFactoryForTest("fake") - // Create a checkpoint for the stopped workflow. For the stopped workflow, - // the task of copying schema to shard 80- succeed while the task of copying - // schema to shard -80 failed. The rest of tasks haven't been executed. - sourceShards := []string{"0"} - destinationShards := []string{"-80", "80-"} - vtworkers := []string{"localhost:15032"} - checkpoint, err := initCheckpointFromShards("test_keyspace", vtworkers, sourceShards, destinationShards) + // Initialize the topology. + ts := setupTopology(ctx, t, testKeyspace) + m := workflow.NewManager(ts) + // Run the manager in the background. + wg, cancel, _ := startManager(t, m) + // Create the workflow. + uuid, err := m.Create(context.Background(), horizontalReshardingFactoryName, []string{"-keyspace=" + testKeyspace, "-vtworkers=" + testVtworkers}) if err != nil { - t.Errorf("initialize checkpoint fails: %v", err) + t.Fatalf("cannot create testworkflow: %v", err) } - setTaskSuccessOrFailure(checkpoint, createTaskID(phaseCopySchema, "80-"), true /* isSuccess*/) - setTaskSuccessOrFailure(checkpoint, createTaskID(phaseCopySchema, "-80"), false /* isSuccess*/) - - hw, err := createWorkflow(ctx, mockWranglerInterface, checkpoint) + // Inject the mock wranger into the workflow. + w, err := m.GetWorkflowForTesting(uuid) if err != nil { - t.Errorf("initialize Workflow fails: %v", err) - } - // Rerunning the workflow. - if err := hw.runWorkflow(); err != nil { - t.Errorf("%s: Horizontal resharding workflow should not fail", err) + t.Errorf("fail to get workflow from manager: %v", err) } + hw := w.(*HorizontalReshardingWorkflow) + hw.wr = mockWranglerInterface - verifySuccess(t, hw.checkpoint) -} - -func setTaskSuccessOrFailure(checkpoint *workflowpb.WorkflowCheckpoint, taskID string, isSuccess bool) { - t := checkpoint.Tasks[taskID] - t.State = workflowpb.TaskState_TaskDone - if !isSuccess { - t.Error = "failed" - } else { - t.Error = "" + // Start the job. + if err := m.Start(context.Background(), uuid); err != nil { + t.Fatalf("cannot start testworkflow: %v", err) } -} -func createWorkflow(ctx context.Context, mockWranglerInterface *MockReshardingWrangler, checkpoint *workflowpb.WorkflowCheckpoint) (*HorizontalReshardingWorkflow, error) { - ts := memorytopo.NewServer("cell") - w := &workflowpb.Workflow{ - Uuid: "test_hw", - FactoryName: horizontalReshardingFactoryName, - State: workflowpb.WorkflowState_NotStarted, - } - wi, err := ts.CreateWorkflow(ctx, w) - if err != nil { - return nil, err - } - hw := &HorizontalReshardingWorkflow{ - ctx: ctx, - wr: mockWranglerInterface, - manager: workflow.NewManager(ts), - wi: wi, - topoServer: ts, - logger: logutil.NewMemoryLogger(), - checkpoint: checkpoint, - checkpointWriter: NewCheckpointWriter(ts, checkpoint, wi), + // Wait for the workflow to end. + m.Wait(context.Background(), uuid) + verifyWorkflowSuccess(context.Background(), t, ts, uuid) // TODO: implemented in parallel_runner_test. + + // Stop the manager. + if err := m.Stop(context.Background(), uuid); err != nil { + t.Fatalf("cannot stop testworkflow: %v", err) } - return hw, nil + cancel() + wg.Wait() } -func setupFakeVtworker() *fakevtworkerclient.FakeVtworkerClient { +func setupFakeVtworker(keyspace, vtworkers string) *fakevtworkerclient.FakeVtworkerClient { flag.Set("vtworker_client_protocol", "fake") fakeVtworkerClient := fakevtworkerclient.NewFakeVtworkerClient() - fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitClone", "--min_healthy_rdonly_tablets=1", "test_keyspace/0"}, "", nil) - fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "test_keyspace/-80"}, "", nil) - fakeVtworkerClient.RegisterResultForAddr("localhost:15032", []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", "test_keyspace/80-"}, "", nil) + fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitClone", "--min_healthy_rdonly_tablets=1", keyspace + "/0"}, "", nil) + fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", keyspace + "/-80"}, "", nil) + fakeVtworkerClient.RegisterResultForAddr(vtworkers, []string{"SplitDiff", "--min_healthy_rdonly_tablets=1", keyspace + "/80-"}, "", nil) return fakeVtworkerClient } -func setupMockWranglerForRetry(ctx context.Context, ctrl *gomock.Controller) *MockReshardingWrangler { - mockWranglerInterface := NewMockReshardingWrangler(ctrl) - // Set the expected behaviors for mock wrangler. copy schema to shard 80- - // should not be called. - mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, "test_keyspace", "0", "test_keyspace", "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil) - mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) - mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) +func setupMockWrangler(ctx context.Context, ctrl *gomock.Controller, keyspace string) *MockReshardingWrangler { + // It is hard to set the expect call using the workflow's context since + // it is only assigned when calling workflow.Run. Therefore, we only check + // the type of the context argument in expect calls. + managerCtx, mCancel := context.WithCancel(ctx) + workflowCtx, wCancel := context.WithCancel(managerCtx) + defer wCancel() + defer mCancel() - servedTypeParams := []topodatapb.TabletType{topodatapb.TabletType_RDONLY, - topodatapb.TabletType_REPLICA, - topodatapb.TabletType_MASTER} - for _, servedType := range servedTypeParams { - mockWranglerInterface.EXPECT().MigrateServedTypes(ctx, "test_keyspace", "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime).Return(nil) - } - return mockWranglerInterface -} + ctxMatcher := typeMatcher{workflowCtx} -func setupMockWrangler(ctx context.Context, ctrl *gomock.Controller) *MockReshardingWrangler { mockWranglerInterface := NewMockReshardingWrangler(ctrl) // Set the expected behaviors for mock wrangler. - mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, "test_keyspace", "0", "test_keyspace", "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil) - mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctx, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, "test_keyspace", "0", "test_keyspace", "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil) + mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctxMatcher, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "-80", wrangler.DefaultWaitSlaveTimeout).Return(nil) + mockWranglerInterface.EXPECT().CopySchemaShardFromShard(ctxMatcher, nil /* tableArray*/, nil /* excludeTableArray */, true /*includeViews*/, keyspace, "0", keyspace, "80-", wrangler.DefaultWaitSlaveTimeout).Return(nil) - mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) - mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctx, "test_keyspace", "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) + mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctxMatcher, keyspace, "-80", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) + mockWranglerInterface.EXPECT().WaitForFilteredReplication(ctxMatcher, keyspace, "80-", wrangler.DefaultWaitForFilteredReplicationMaxDelay).Return(nil) servedTypeParams := []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_MASTER} for _, servedType := range servedTypeParams { - mockWranglerInterface.EXPECT().MigrateServedTypes(ctx, "test_keyspace", "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime).Return(nil) + mockWranglerInterface.EXPECT().MigrateServedTypes(ctxMatcher, keyspace, "0", nil /* cells */, servedType, false /* reverse */, false /* skipReFreshState */, wrangler.DefaultFilteredReplicationWaitTime).Return(nil) } return mockWranglerInterface } -func verifySuccess(t *testing.T, checkpoint *workflowpb.WorkflowCheckpoint) { - for _, task := range checkpoint.Tasks { - if task.State != workflowpb.TaskState_TaskDone || task.Error != "" { - t.Fatalf("task: %v should succeed: task status: %v, %v", task.Id, task.State, task.Error) +func setupTopology(ctx context.Context, t *testing.T, keyspace string) topo.Server { + ts := memorytopo.NewServer("cell") + if err := ts.CreateKeyspace(ctx, keyspace, &topodatapb.Keyspace{}); err != nil { + t.Errorf("CreateKeyspace: %v", err) + } + createShard(ctx, t, ts.Impl, keyspace, "0", true) + createShard(ctx, t, ts.Impl, keyspace, "-80", false) + createShard(ctx, t, ts.Impl, keyspace, "80-", false) + return ts +} + +func createShard(ctx context.Context, t *testing.T, ts topo.Impl, keyspace, name string, isServing bool) { + shard := &topodatapb.Shard{ + KeyRange: newKeyRange(name), + } + if isServing { + servedTypes := map[topodatapb.TabletType]bool{ + topodatapb.TabletType_MASTER: true, + topodatapb.TabletType_REPLICA: true, + topodatapb.TabletType_RDONLY: true, } + for st := range servedTypes { + shard.ServedTypes = append(shard.ServedTypes, &topodatapb.Shard_ServedType{ + TabletType: st, + }) + } + } + + if err := ts.CreateShard(ctx, keyspace, name, shard); err != nil { + t.Fatalf("CreateShard fails: %v", err) + } +} + +func newKeyRange(value string) *topodatapb.KeyRange { + _, result, err := topo.ValidateShardName(value) + if err != nil { + panic(fmt.Sprintf("BUG: invalid shard name to initialize key range: %v", err)) } + return result +} + +// typeMatcher implements the interface gomock.Matcher. +type typeMatcher struct { + x interface{} +} + +// Matches check whether the type of the variables are the same. +func (t typeMatcher) Matches(x interface{}) bool { + return fmt.Sprintf("%T", t.x) == fmt.Sprintf("%T", x) +} + +func (t typeMatcher) String() string { + return fmt.Sprintf("has the same type with %v", t.x) }