Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[workflow] Cleanup wrangler wrappers, migrate checkIfJournalExistsOnTablet to package workflow #8193

Merged
merged 2 commits into from
May 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"google.golang.org/protobuf/encoding/prototext"

"k8s.io/apimachinery/pkg/util/sets"

"vitess.io/vitess/go/sqltypes"
Expand All @@ -34,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/proto/vttime"
)
Expand Down Expand Up @@ -73,6 +73,37 @@ func NewServer(ts *topo.Server, tmc tmclient.TabletManagerClient) *Server {
}
}

// CheckReshardingJournalExistsOnTablet returns the journal (or an empty
// journal) and a boolean to indicate if the resharding_journal table exists on
// the given tablet.
//
// (TODO:@ajm188) This should not be part of the final public API, and should
// be un-exported after all places in package wrangler that call this have been
// migrated over.
func (s *Server) CheckReshardingJournalExistsOnTablet(ctx context.Context, tablet *topodatapb.Tablet, migrationID int64) (*binlogdatapb.Journal, bool, error) {
var (
journal binlogdatapb.Journal
exists bool
)

query := fmt.Sprintf("select val from _vt.resharding_journal where id=%v", migrationID)
p3qr, err := s.tmc.VReplicationExec(ctx, tablet, query)
if err != nil {
return nil, false, err
}

if len(p3qr.Rows) != 0 {
qr := sqltypes.Proto3ToResult(p3qr)
if err := prototext.Unmarshal(qr.Rows[0][0].ToBytes(), &journal); err != nil {
return nil, false, err
}

exists = true
}

return &journal, exists, nil
}

// GetWorkflows returns a list of all workflows that exist in a given keyspace,
// with some additional filtering depending on the request parameters (for
// example, ActiveOnly=true restricts the search to only workflows that are
Expand Down
165 changes: 165 additions & 0 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package workflow

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

type fakeTMC struct {
tmclient.TabletManagerClient
vrepQueriesByTablet map[string]map[string]*querypb.QueryResult
}

func (fake *fakeTMC) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) {
alias := topoproto.TabletAliasString(tablet.Alias)
tabletQueries, ok := fake.vrepQueriesByTablet[alias]
if !ok {
return nil, fmt.Errorf("no query map registered on fake for %s", alias)
}

p3qr, ok := tabletQueries[query]
if !ok {
return nil, fmt.Errorf("no result on fake for query %q on tablet %s", query, alias)
}

return p3qr, nil
}

func TestCheckReshardingJournalExistsOnTablet(t *testing.T) {
t.Parallel()

ctx := context.Background()
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone1",
Uid: 100,
},
}
journal := &binlogdatapb.Journal{
Id: 1,
MigrationType: binlogdatapb.MigrationType_SHARDS,
Tables: []string{"t1", "t2"},
}
journalBytes, err := prototext.Marshal(journal)
require.NoError(t, err, "could not marshal journal %+v into bytes", journal)

// get some bytes that will fail to unmarshal into a binlogdatapb.Journal
tabletBytes, err := prototext.Marshal(tablet)
require.NoError(t, err, "could not marshal tablet %+v into bytes", tablet)

p3qr := sqltypes.ResultToProto3(sqltypes.MakeTestResult([]*querypb.Field{
{
Name: "val",
Type: querypb.Type_BLOB,
},
}, string(journalBytes)))

tests := []struct {
name string
tablet *topodatapb.Tablet
result *querypb.QueryResult
journal *binlogdatapb.Journal
shouldExist bool
shouldErr bool
}{
{
name: "journal exists",
tablet: tablet,
result: p3qr,
shouldExist: true,
journal: journal,
},
{
name: "journal does not exist",
tablet: tablet,
result: sqltypes.ResultToProto3(sqltypes.MakeTestResult(nil)),
journal: &binlogdatapb.Journal{},
shouldExist: false,
},
{
name: "cannot unmarshal into journal",
tablet: tablet,
result: sqltypes.ResultToProto3(sqltypes.MakeTestResult([]*querypb.Field{
{
Name: "val",
Type: querypb.Type_BLOB,
},
}, string(tabletBytes))),
shouldErr: true,
},
{
name: "VReplicationExec fails on tablet",
tablet: &topodatapb.Tablet{ // Here we use a different tablet to force the fake to return an error
Alias: &topodatapb.TabletAlias{
Cell: "zone2",
Uid: 200,
},
},
shouldErr: true,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

tmc := &fakeTMC{
vrepQueriesByTablet: map[string]map[string]*querypb.QueryResult{
topoproto.TabletAliasString(tablet.Alias): { // always use the tablet shared by these tests cases
"select val from _vt.resharding_journal where id=1": tt.result,
},
},
}

ws := NewServer(nil, tmc)
journal, exists, err := ws.CheckReshardingJournalExistsOnTablet(ctx, tt.tablet, 1)
if tt.shouldErr {
assert.Error(t, err)
return
}

require.NoError(t, err)

existAssertionMsg := "expected journal to "
if tt.shouldExist {
existAssertionMsg += "already exist on tablet"
} else {
existAssertionMsg += "not exist"
}

assert.Equal(t, tt.shouldExist, exists, existAssertionMsg)
utils.MustMatch(t, tt.journal, journal, "journal in resharding_journal did not match")
})
}
}
18 changes: 12 additions & 6 deletions go/vt/wrangler/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtctl/workflow"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"context"
Expand Down Expand Up @@ -158,7 +160,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
if externalTopo == nil {
// Save routing rules before vschema. If we save vschema first, and routing rules
// fails to save, we may generate duplicate table errors.
rules, err := wr.getRoutingRules(ctx)
rules, err := topotools.GetRoutingRules(ctx, wr.ts)
if err != nil {
return err
}
Expand All @@ -174,7 +176,7 @@ func (wr *Wrangler) MoveTables(ctx context.Context, workflow, sourceKeyspace, ta
rules[sourceKeyspace+"."+table+"@replica"] = toSource
rules[sourceKeyspace+"."+table+"@rdonly"] = toSource
}
if err := wr.saveRoutingRules(ctx, rules); err != nil {
if err := topotools.SaveRoutingRules(ctx, wr.ts, rules); err != nil {
return err
}
if vschema != nil {
Expand Down Expand Up @@ -311,9 +313,13 @@ func (wr *Wrangler) checkIfPreviousJournalExists(ctx context.Context, mz *materi
return allErrors.AggrError(vterrors.Aggregate)
}

var mu sync.Mutex
var exists bool
var tablets []string
var (
mu sync.Mutex
exists bool
tablets []string
ws = workflow.NewServer(wr.ts, wr.tmc)
)

err := forAllSources(func(si *topo.ShardInfo) error {
tablet, err := wr.ts.GetTablet(ctx, si.MasterAlias)
if err != nil {
Expand All @@ -322,7 +328,7 @@ func (wr *Wrangler) checkIfPreviousJournalExists(ctx context.Context, mz *materi
if tablet == nil {
return nil
}
_, exists, err = wr.checkIfJournalExistsOnTablet(ctx, tablet.Tablet, migrationID)
_, exists, err = ws.CheckReshardingJournalExistsOnTablet(ctx, tablet.Tablet, migrationID)
if err != nil {
return err
}
Expand Down
Loading