Skip to content

Commit

Permalink
Fixups for v19
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jun 28, 2024
1 parent 2603ff6 commit 48d2613
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 71 deletions.
56 changes: 0 additions & 56 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
Expand Down Expand Up @@ -429,60 +427,6 @@ func (tmc *testTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, r
}, nil
}

func (tmc *testTMClient) HasVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.HasVReplicationWorkflowsRequest) (*tabletmanagerdatapb.HasVReplicationWorkflowsResponse, error) {
return &tabletmanagerdatapb.HasVReplicationWorkflowsResponse{
Has: false,
}, nil
}

func (tmc *testTMClient) ReadVReplicationWorkflows(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowsRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables
if len(req.IncludeWorkflows) > 0 {
for _, wf := range req.IncludeWorkflows {
if strings.Contains(wf, "lookup") {
workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex
}
}
ks := tmc.env.sourceKeyspace
if tmc.reverse.Load() {
ks = tmc.env.targetKeyspace
}
return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{
Workflows: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse{
{
Workflow: req.IncludeWorkflows[0],
WorkflowType: workflowType,
Streams: []*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{
{
Id: 1,
State: binlogdatapb.VReplicationWorkflowState_Running,
Bls: &binlogdatapb.BinlogSource{
Keyspace: ks.KeyspaceName,
Shard: ks.ShardNames[0],
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "/.*/",
},
},
},
},
Pos: "MySQL56/" + position,
TimeUpdated: protoutil.TimeToProto(time.Now()),
TimeHeartbeat: protoutil.TimeToProto(time.Now()),
},
},
},
},
}, nil
} else {
return &tabletmanagerdatapb.ReadVReplicationWorkflowsResponse{}, nil
}
}

func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) {
return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{
Result: &querypb.QueryResult{
Expand Down
12 changes: 0 additions & 12 deletions go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package workflow
import (
"context"
"fmt"
"os"
"regexp"
"strconv"
"strings"
Expand All @@ -36,7 +35,6 @@ import (
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tmclient"

_flag "vitess.io/vitess/go/internal/flag"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand All @@ -45,11 +43,6 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type queryResult struct {
query string
result *querypb.QueryResult
}

type testMaterializerEnv struct {
ws *Server
ms *vtctldatapb.MaterializeSettings
Expand All @@ -66,11 +59,6 @@ type testMaterializerEnv struct {
//----------------------------------------------
// testMaterializerEnv

func TestMain(m *testing.M) {
_flag.ParseFlagsForTest()
os.Exit(m.Run())
}

func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv {
t.Helper()
env := &testMaterializerEnv{
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/materializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)

const position = "9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97"
const getWorkflowQuery = "select id from _vt.vreplication where db_name='vt_targetks' and workflow='workflow'"
const mzUpdateQuery = "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='workflow'"
const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1"
Expand All @@ -53,7 +54,6 @@ const mzGetWorkflowStatusQuery = "select id, workflow, source, pos, stop_pos, ma
const mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1"
const mzGetLatestCopyState = "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)"
const insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys\) values `
const eol = "$"

var (
defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String()
Expand Down
44 changes: 42 additions & 2 deletions go/vt/vtctl/workflow/resharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
Expand Down Expand Up @@ -60,6 +61,17 @@ func TestReshardCreate(t *testing.T) {
},
}

var binlogSource = &binlogdatapb.BinlogSource{
Keyspace: sourceKeyspaceName,
Shard: "0",
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
},
}

testcases := []struct {
name string
sourceKeyspace, targetKeyspace *testKeyspace
Expand Down Expand Up @@ -141,11 +153,22 @@ func TestReshardCreate(t *testing.T) {

for i := range tc.sourceKeyspace.ShardNames {
tabletUID := startingSourceTabletUID + (tabletUIDStep * i)
env.tmc.expectVRQuery(
tabletUID,
fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s' and message != 'FROZEN'", targetKeyspaceName),
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
"select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1",
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
fmt.Sprintf("select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied, tablet_types, cell from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'",
workflowName, targetKeyspaceName),
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
"select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)",
Expand All @@ -155,18 +178,35 @@ func TestReshardCreate(t *testing.T) {

for i, target := range tc.targetKeyspace.ShardNames {
tabletUID := startingTargetTabletUID + (tabletUIDStep * i)
env.tmc.expectVRQuery(
tabletUID,
fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", targetKeyspaceName),
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
insertPrefix+
`\('`+workflowName+`', 'keyspace:"`+targetKeyspaceName+`" shard:"0" filter:{rules:{match:"/.*" filter:"`+target+`"}}', '', [0-9]*, [0-9]*, '`+
env.cell+`', '`+tabletTypesStr+`', [0-9]*, 0, 'Stopped', 'vt_`+targetKeyspaceName+`', 4, 0, false, '{}'\)`+eol,
`\('`+workflowName+`', 'keyspace:\\"`+targetKeyspaceName+`\\" shard:\\"0\\" filter:{rules:{match:\\"/.*\\" filter:\\"`+target+`\\"}}', '', [0-9]*, [0-9]*, '`+
env.cell+`', '`+tabletTypesStr+`', [0-9]*, 0, 'Stopped', 'vt_`+targetKeyspaceName+`', 4, 0, false\)`+eol,
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
"select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1",
&sqltypes.Result{},
)
env.tmc.expectVRQuery(
tabletUID,
fmt.Sprintf("select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied, tablet_types, cell from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'",
workflowName, targetKeyspaceName),
sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|workflow|source|pos|stop_pos|max_replication_log|state|db_name|time_updated|transaction_timestamp|message|tags|workflow_type|workflow_sub_type|time_heartbeat|defer_secondary_keys|component_throttled|time_throttled|rows_copied|tablet_tuypes|cell",
"int64|varchar|blob|varchar|varchar|int64|varchar|varchar|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|int64|int64|varchar|varchar",
),
fmt.Sprintf("1|%s|%s|MySQL56/%s|NULL|0|Running|vt_%s|1686577659|0|||1|0|0|0||0|10||", workflowName, binlogSource, position, sourceKeyspaceName),
),
)
env.tmc.expectVRQuery(
tabletUID,
"select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)",
Expand Down

0 comments on commit 48d2613

Please sign in to comment.