Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Sep 21, 2024
1 parent 53b4a77 commit c62d0d2
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 33 deletions.
26 changes: 22 additions & 4 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ type testTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse
readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest
primaryPositions map[uint32]string
vdiffRequests map[uint32]*vdiffRequestResponse
Expand All @@ -289,7 +289,7 @@ func newTestTMClient(env *testEnv) *testTMClient {
return &testTMClient{
schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition),
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest),
readVReplicationWorkflowsResponses: make(map[string][]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse),
primaryPositions: make(map[uint32]string),
Expand All @@ -304,9 +304,12 @@ func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet
defer tmc.mu.Unlock()

if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, req) {
if !proto.Equal(expect.req, req) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect)
}
if expect.res != nil {
return expect.res, expect.err
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
Expand Down Expand Up @@ -418,13 +421,22 @@ func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, q
}
}

func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

tmc.createVReplicationWorkflowRequests[tabletID] = req
}

func (tmc *testTMClient) expectCreateVReplicationWorkflowRequestOnTargetTablets(req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

for _, tablet := range tmc.env.tablets[tmc.env.targetKeyspace.KeyspaceName] {
tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid] = req
}
}

func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down Expand Up @@ -479,6 +491,12 @@ type vdiffRequestResponse struct {
err error
}

type createVReplicationWorkflowRequestResponse struct {
req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest
res *tabletmanagerdatapb.CreateVReplicationWorkflowResponse
err error
}

func (tmc *testTMClient) expectVDiffRequest(tablet *topodatapb.Tablet, vrr *vdiffRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down
39 changes: 30 additions & 9 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -120,16 +121,33 @@ func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.M
if err == nil {
tableName = table.Name.String()
}
stmt, err := env.venv.Parser().ParseStrictDDL(ts.CreateDdl)
require.NoError(t, err)
ddl, ok := stmt.(*sqlparser.CreateTable)
require.True(t, ok)
cols := make([]string, len(ddl.TableSpec.Columns))
fields := make([]*querypb.Field, len(ddl.TableSpec.Columns))
for i, col := range ddl.TableSpec.Columns {
cols[i] = col.Name.String()
fields[i] = &querypb.Field{
Name: col.Name.String(),
Type: col.Type.SQLType(),
}
}
env.tmc.schema[ms.SourceKeyspace+"."+tableName] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: tableName,
Schema: fmt.Sprintf("%s_schema", tableName),
Name: tableName,
Schema: ts.CreateDdl,
Columns: cols,
Fields: fields,
}},
}
env.tmc.schema[ms.TargetKeyspace+"."+ts.TargetTable] = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: ts.TargetTable,
Schema: fmt.Sprintf("%s_schema", ts.TargetTable),
Name: ts.TargetTable,
Schema: ts.CreateDdl,
Columns: cols,
Fields: fields,
}},
}
}
Expand Down Expand Up @@ -199,7 +217,7 @@ type testMaterializerTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse

// Used to confirm the number of times WorkflowDelete was called.
workflowDeleteCalls int
Expand All @@ -215,15 +233,18 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe
sourceShards: sourceShards,
tableSettings: tableSettings,
vrQueries: make(map[int][]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
}
}

func (tmc *testMaterializerTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, request *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) {
if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil {
if !proto.Equal(expect, request) {
if expect.req != nil && !proto.Equal(expect.req, request) {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", request, expect)
}
if expect.res != nil {
return expect.res, expect.err
}
}
res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1")
return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil
Expand Down Expand Up @@ -315,7 +336,7 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r
})
}

func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) {
func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

Expand Down Expand Up @@ -344,7 +365,7 @@ func (tmc *testMaterializerTMClient) VReplicationExec(ctx context.Context, table

qrs := tmc.vrQueries[int(tablet.Alias.Uid)]
if len(qrs) == 0 {
return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query)
return nil, fmt.Errorf("tablet %v does not expect any more queries: %q", tablet, query)
}
matched := false
if qrs[0].query[0] == '/' {
Expand Down
103 changes: 86 additions & 17 deletions go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package workflow

import (
"context"
"errors"
"fmt"
"slices"
"strings"
Expand Down Expand Up @@ -2111,6 +2112,16 @@ func TestCreateLookupVindexFailures(t *testing.T) {
SourceKeyspace: "sourceks",
// Keyspace where the lookup table and VReplication workflow is created.
TargetKeyspace: "targetks",
TableSettings: []*vtctldatapb.TableMaterializeSettings{
{
TargetTable: "t1",
CreateDdl: "CREATE TABLE `t1` (\n`c1` INT,\n PRIMARY KEY(`c1`)\n)",
},
{
TargetTable: "t2",
CreateDdl: "CREATE TABLE `t2` (\n`c2` INT,\n PRIMARY KEY(`c2`)\n)",
},
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -2122,7 +2133,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": "targetks.t",
"table": fmt.Sprintf("%s.t", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
},
Expand All @@ -2135,10 +2146,10 @@ func TestCreateLookupVindexFailures(t *testing.T) {
"xxhash": {
Type: "xxhash",
},
"v": {
"v1": {
Type: "lookup_unique",
Params: map[string]string{
"table": "targetks.t",
"table": fmt.Sprintf("%s.t", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
"write_only": "true",
Expand All @@ -2148,19 +2159,28 @@ func TestCreateLookupVindexFailures(t *testing.T) {
Tables: map[string]*vschemapb.Table{
"t1": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v",
Name: "v1",
Column: "c1",
}},
},
"t2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v2",
Column: "c2",
}},
},
},
}
err := env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs)
err := env.topoServ.SaveVSchema(ctx, ms.SourceKeyspace, vs)
require.NoError(t, err)
err = env.topoServ.SaveVSchema(ctx, ms.TargetKeyspace, vs)
require.NoError(t, err)

testcases := []struct {
description string
input *vschemapb.Keyspace
err string
description string
input *vschemapb.Keyspace
createRequest *createVReplicationWorkflowRequestResponse
err string
}{
{
description: "dup vindex",
Expand Down Expand Up @@ -2208,7 +2228,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": "targetks.t",
"table": fmt.Sprintf("%s.t", ms.TargetKeyspace),
"from": "c1,c2",
"to": "c3",
},
Expand All @@ -2224,7 +2244,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
"v": {
Type: "lookup",
Params: map[string]string{
"table": "targetks.t",
"table": fmt.Sprintf("%s.t", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
},
Expand All @@ -2240,7 +2260,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
"v": {
Type: "lookup_noexist",
Params: map[string]string{
"table": "targetks.t",
"table": fmt.Sprintf("%s.t", ms.TargetKeyspace),
"from": "c1,c2",
"to": "c2",
},
Expand All @@ -2264,7 +2284,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": "targetks.t",
"table": fmt.Sprintf("%s.t", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
},
Expand Down Expand Up @@ -2324,7 +2344,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
"v": {
Type: "lookup_unique",
Params: map[string]string{
"table": "targetks.t",
"table": fmt.Sprintf("%s.t", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
},
Expand Down Expand Up @@ -2377,7 +2397,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
"xxhash": {
Type: "lookup_unique",
Params: map[string]string{
"table": "targetks.t",
"table": fmt.Sprintf("%s.t", ms.TargetKeyspace),
"from": "c1",
"to": "c2",
},
Expand All @@ -2393,7 +2413,7 @@ func TestCreateLookupVindexFailures(t *testing.T) {
},
},
},
err: "a conflicting vindex named xxhash already exists in the targetks keyspace",
err: fmt.Sprintf("a conflicting vindex named xxhash already exists in the %s keyspace", ms.TargetKeyspace),
},
{
description: "source table not in vschema",
Expand All @@ -2408,7 +2428,37 @@ func TestCreateLookupVindexFailures(t *testing.T) {
},
},
},
err: "table other not found in the targetks keyspace",
err: fmt.Sprintf("table other not found in the %s keyspace", ms.TargetKeyspace),
},
{
description: "workflow name already exists",
input: &vschemapb.Keyspace{
Vindexes: map[string]*vschemapb.Vindex{
"v2": {
Type: "consistent_lookup_unique",
Params: map[string]string{
"table": fmt.Sprintf("%s.t1_lkp", ms.TargetKeyspace),
"from": "c1",
"to": "keyspace_id",
},
},
},
Tables: map[string]*vschemapb.Table{
"t2": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Name: "v2",
Column: "c2",
}},
},
},
},
//vrQuery: "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)",
createRequest: &createVReplicationWorkflowRequestResponse{
req: nil,
res: &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{},
err: errors.New("we gots us an error"),
},
err: "we gots us an error",
},
}
for _, tcase := range testcases {
Expand All @@ -2418,10 +2468,26 @@ func TestCreateLookupVindexFailures(t *testing.T) {
Keyspace: ms.TargetKeyspace,
Vindex: tcase.input,
}
if tcase.createRequest != nil {
for _, tablet := range env.tablets {
if tablet.Keyspace == ms.TargetKeyspace {
env.tmc.expectVRQuery(int(tablet.Alias.Uid), "CREATE TABLE `t1_lkp` (\n`c1` INT,\n `keyspace_id` varbinary(128),\n PRIMARY KEY (`c1`)\n)", &sqltypes.Result{})
env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tcase.createRequest)
}
}
}
_, err := env.ws.LookupVindexCreate(ctx, req)
if !strings.Contains(err.Error(), tcase.err) {
t.Errorf("CreateLookupVindex(%s) err: %v, must contain %v", tcase.description, err, tcase.err)
}
// Confirm that the original vschema where the vindex would
// be created is still in place -- since the workflow
// creation failed in each test case. That vindex is created
// in the source keyspace based on the MaterializeSettings
// definition.
cvs, err := env.ws.ts.GetVSchema(ctx, ms.TargetKeyspace)
require.NoError(t, err)
require.True(t, proto.Equal(vs, cvs), "expected: %+v, got: %+v", vs, cvs)
})
}
}
Expand Down Expand Up @@ -2696,7 +2762,10 @@ func TestKeyRangesEqualOptimization(t *testing.T) {
if len(tc.moveTablesReq.SourceShards) > 0 && !slices.Contains(tc.moveTablesReq.SourceShards, tablet.Shard) {
continue
}
env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, tc.wantReqs[tablet.Alias.Uid])
reqRes := &createVReplicationWorkflowRequestResponse{
req: tc.wantReqs[tablet.Alias.Uid],
}
env.tmc.expectCreateVReplicationWorkflowRequest(tablet.Alias.Uid, reqRes)
}

mz := &materializer{
Expand Down
Loading

0 comments on commit c62d0d2

Please sign in to comment.