Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vreplication: vdiff #5367

Merged
merged 28 commits into from
Nov 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
2083ace
mysqlctl: GetSchema also returns field info
sougou Sep 27, 2019
ee052a5
vdiff: planbuilder initial cut
sougou Sep 29, 2019
8b6eb63
vdiff: add a test
sougou Sep 29, 2019
cd858ac
vdiff: resultStreamer
sougou Sep 29, 2019
0f608e9
vdiff: resultStreamer test
sougou Oct 14, 2019
c6994e5
vdiff: stopTargetStreams, waitForSourceStreams
sougou Oct 6, 2019
c581f0e
discovery: move tabletpicker from vreplication
sougou Oct 6, 2019
c6b7e30
vreplication: use discovery.TabletPicker
sougou Oct 6, 2019
d6063d9
discovery: refactor TabletPicker
sougou Oct 6, 2019
3bdafde
discovery: improve TabletPicker algorithm
sougou Oct 19, 2019
297c5d1
vdiff: new data structures
sougou Oct 6, 2019
2ef2962
tabletmanager: WaitForPosition
sougou Oct 7, 2019
ea2a5c0
vdiff: selectTablets
sougou Oct 7, 2019
d37fd4f
vdiff: streamFromSources
sougou Oct 7, 2019
3a9aa89
vdiff: syncTargets and restartTargets
sougou Oct 8, 2019
51ce2c5
vdiff: resultReader that uses engine.MergeSort
sougou Oct 10, 2019
798fedb
vdiff: diff function
sougou Oct 12, 2019
8e231da
vdiff: main function and start of testing
sougou Oct 13, 2019
65a0051
vdiff: all plan builder tests
sougou Oct 13, 2019
38c2ae9
vdiff: test framework with a basic test
sougou Oct 14, 2019
7139502
vdiff: rest of the tests
sougou Oct 14, 2019
3891247
vdiff: filteredReplicationWaitTime
sougou Oct 16, 2019
78babdd
vdiff: vtctl command
sougou Oct 16, 2019
98429a9
vdiff: fix two bugs
sougou Oct 17, 2019
357d070
vdiff: handle aggregates
sougou Oct 19, 2019
81afa30
vreplication: re-introduce healthcheck timeouts
sougou Oct 28, 2019
7d8a08d
vdiff: post-rebase fix
sougou Nov 3, 2019
1b8bece
vdiff: fix broken tabletmanagerdata_pb2.py
sougou Nov 3, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 34 additions & 15 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,22 @@ func (l *listener) StatsUpdate(ts *TabletStats) {
l.output <- ts
}

type fakeConn struct {
queryservice.QueryService
tablet *topodatapb.Tablet
// If fixedResult is set, the channels are not used.
fixedResult *querypb.StreamHealthResponse
// hcChan should be an unbuffered channel which holds the tablet's next health response.
hcChan chan *querypb.StreamHealthResponse
// errCh is either an unbuffered channel which holds the stream error to return, or nil.
errCh chan error
// cbErrCh is a channel which receives errors returned from the supplied callback.
cbErrCh chan error

mu sync.Mutex
canceled bool
}

func createFakeConn(tablet *topodatapb.Tablet, c chan *querypb.StreamHealthResponse) *fakeConn {
key := TabletToMapKey(tablet)
conn := &fakeConn{
Expand All @@ -668,35 +684,38 @@ func createFakeConn(tablet *topodatapb.Tablet, c chan *querypb.StreamHealthRespo
return conn
}

func discoveryDialer(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
func createFixedHealthConn(tablet *topodatapb.Tablet, fixedResult *querypb.StreamHealthResponse) *fakeConn {
key := TabletToMapKey(tablet)
return connMap[key], nil
conn := &fakeConn{
QueryService: fakes.ErrorQueryService,
tablet: tablet,
fixedResult: fixedResult,
}
connMap[key] = conn
return conn
}

type fakeConn struct {
queryservice.QueryService
tablet *topodatapb.Tablet
// hcChan should be an unbuffered channel which holds the tablet's next health response.
hcChan chan *querypb.StreamHealthResponse
// errCh is either an unbuffered channel which holds the stream error to return, or nil.
errCh chan error
// cbErrCh is a channel which receives errors returned from the supplied callback.
cbErrCh chan error

mu sync.Mutex
canceled bool
func discoveryDialer(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) {
key := TabletToMapKey(tablet)
return connMap[key], nil
}

// StreamHealth implements queryservice.QueryService.
func (fc *fakeConn) StreamHealth(ctx context.Context, callback func(shr *querypb.StreamHealthResponse) error) error {
if fc.fixedResult != nil {
return callback(fc.fixedResult)
}
for {
select {
case shr := <-fc.hcChan:
if err := callback(shr); err != nil {
if err == io.EOF {
return nil
}
fc.cbErrCh <- err
select {
case fc.cbErrCh <- err:
case <-ctx.Done():
}
return err
}
case err := <-fc.errCh:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,55 +14,46 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication
package discovery

import (
"flag"
"fmt"
"math/rand"
"time"

"vitess.io/vitess/go/vt/vterrors"

"golang.org/x/net/context"

"vitess.io/vitess/go/vt/discovery"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

var (
healthCheckTopologyRefresh = flag.Duration("vreplication_healthcheck_topology_refresh", 30*time.Second, "refresh interval for re-reading the topology")
healthcheckRetryDelay = flag.Duration("vreplication_healthcheck_retry_delay", 5*time.Second, "delay before retrying a failed healthcheck")
healthCheckTimeout = flag.Duration("vreplication_healthcheck_timeout", time.Minute, "the health check timeout period")
"vitess.io/vitess/go/vt/vterrors"
)

type tabletPicker struct {
// TabletPicker gives a simplified API for picking tablets.
type TabletPicker struct {
ts *topo.Server
cell string
keyspace string
shard string
tabletTypes []topodatapb.TabletType

healthCheck discovery.HealthCheck
watcher *discovery.TopologyWatcher
statsCache *discovery.TabletStatsCache
healthCheck HealthCheck
watcher *TopologyWatcher
statsCache *TabletStatsCache
}

func newTabletPicker(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypesStr string) (*tabletPicker, error) {
// NewTabletPicker returns a TabletPicker.
func NewTabletPicker(ctx context.Context, ts *topo.Server, cell, keyspace, shard, tabletTypesStr string, healthcheckTopologyRefresh, healthcheckRetryDelay, healthcheckTimeout time.Duration) (*TabletPicker, error) {
tabletTypes, err := topoproto.ParseTabletTypes(tabletTypesStr)
if err != nil {
return nil, fmt.Errorf("failed to parse list of tablet types: %v", tabletTypesStr)
}

// These have to be initialized in the following sequence (watcher must be last).
healthCheck := discovery.NewHealthCheck(*healthcheckRetryDelay, *healthCheckTimeout)
statsCache := discovery.NewTabletStatsCache(healthCheck, ts, cell)
watcher := discovery.NewShardReplicationWatcher(ctx, ts, healthCheck, cell, keyspace, shard, *healthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency)
healthCheck := NewHealthCheck(healthcheckRetryDelay, healthcheckTimeout)
statsCache := NewTabletStatsCache(healthCheck, ts, cell)
watcher := NewShardReplicationWatcher(ctx, ts, healthCheck, cell, keyspace, shard, healthcheckTopologyRefresh, DefaultTopoReadConcurrency)

return &tabletPicker{
return &TabletPicker{
ts: ts,
cell: cell,
keyspace: keyspace,
Expand All @@ -74,27 +65,28 @@ func newTabletPicker(ctx context.Context, ts *topo.Server, cell, keyspace, shard
}, nil
}

func (tp *tabletPicker) Pick(ctx context.Context) (*topodatapb.Tablet, error) {
// PickForStreaming picks all healthy tablets including the non-serving ones.
func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Tablet, error) {
// wait for any of required the tablets (useful for the first run at least, fast for next runs)
if err := tp.statsCache.WaitForAnyTablet(ctx, tp.cell, tp.keyspace, tp.shard, tp.tabletTypes); err != nil {
if err := tp.statsCache.WaitByFilter(ctx, tp.keyspace, tp.shard, tp.tabletTypes, RemoveUnhealthyTablets); err != nil {
return nil, vterrors.Wrapf(err, "error waiting for tablets for %v %v %v", tp.cell, tp.keyspace, tp.shard)
}

// Find the server list from the health check.
// Note: We cannot use statsCache.GetHealthyTabletStats() here because it does
// not return non-serving tablets. We must include non-serving tablets because
// some tablets may not be serving if their traffic was already migrated to the
// destination shards.
// Refilter the tablets list based on the same criteria.
var addrs []TabletStats
for _, tabletType := range tp.tabletTypes {
addrs := discovery.RemoveUnhealthyTablets(tp.statsCache.GetTabletStats(tp.keyspace, tp.shard, tabletType))
if len(addrs) > 0 {
return addrs[rand.Intn(len(addrs))].Tablet, nil
}
list := RemoveUnhealthyTablets(tp.statsCache.GetTabletStats(tp.keyspace, tp.shard, tabletType))
addrs = append(addrs, list...)
}
if len(addrs) > 0 {
return addrs[rand.Intn(len(addrs))].Tablet, nil
}
// Unreachable.
return nil, fmt.Errorf("can't find any healthy source tablet for %v %v %v", tp.keyspace, tp.shard, tp.tabletTypes)
}

func (tp *tabletPicker) Close() {
// Close shuts down TabletPicker.
func (tp *TabletPicker) Close() {
tp.watcher.Stop()
tp.healthCheck.Close()
}
Expand Down
175 changes: 175 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
Copyright 2019 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
)

func TestPickSimple(t *testing.T) {
te := newPickerTestEnv(t)
want := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true)
defer deleteTablet(te, want)

tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica", 1*time.Second, 1*time.Second, 1*time.Minute)
require.NoError(t, err)
defer tp.Close()

tablet, err := tp.PickForStreaming(context.Background())
require.NoError(t, err)
if !proto.Equal(want, tablet) {
t.Errorf("Pick: %v, want %v", tablet, want)
}
}

func TestPickFromTwoHealthy(t *testing.T) {
te := newPickerTestEnv(t)
want1 := addTablet(te, 100, topodatapb.TabletType_REPLICA, true, true)
defer deleteTablet(te, want1)
want2 := addTablet(te, 101, topodatapb.TabletType_RDONLY, true, true)
defer deleteTablet(te, want2)

tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly", 1*time.Second, 1*time.Second, 1*time.Minute)
require.NoError(t, err)
defer tp.Close()

// In 20 attempts, both tablet types must be picked at least once.
var picked1, picked2 bool
for i := 0; i < 20; i++ {
tablet, err := tp.PickForStreaming(context.Background())
require.NoError(t, err)
if proto.Equal(tablet, want1) {
picked1 = true
}
if proto.Equal(tablet, want2) {
picked2 = true
}
}
assert.True(t, picked1)
assert.True(t, picked2)
}

func TestPickFromSomeUnhealthy(t *testing.T) {
te := newPickerTestEnv(t)
defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false))
want := addTablet(te, 101, topodatapb.TabletType_RDONLY, false, true)
defer deleteTablet(te, want)

tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly", 1*time.Second, 1*time.Second, 1*time.Minute)
require.NoError(t, err)
defer tp.Close()

tablet, err := tp.PickForStreaming(context.Background())
require.NoError(t, err)
if !proto.Equal(tablet, want) {
t.Errorf("Pick:\n%v, want\n%v", tablet, want)
}
}

func TestPickError(t *testing.T) {
te := newPickerTestEnv(t)
defer deleteTablet(te, addTablet(te, 100, topodatapb.TabletType_REPLICA, false, false))

_, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "badtype", 1*time.Second, 1*time.Second, 1*time.Minute)
assert.EqualError(t, err, "failed to parse list of tablet types: badtype")

tp, err := NewTabletPicker(context.Background(), te.topoServ, te.cell, te.keyspace, te.shard, "replica,rdonly", 1*time.Second, 1*time.Second, 1*time.Minute)
require.NoError(t, err)
defer tp.Close()

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
_, err = tp.PickForStreaming(ctx)
require.Error(t, err)
assert.Contains(t, err.Error(), "error waiting for tablets")
}

type pickerTestEnv struct {
t *testing.T
keyspace string
shard string
cell string

topoServ *topo.Server
}

func newPickerTestEnv(t *testing.T) *pickerTestEnv {
ctx := context.Background()

te := &pickerTestEnv{
t: t,
keyspace: "ks",
shard: "0",
cell: "cell",
topoServ: memorytopo.NewServer("cell"),
}
err := te.topoServ.CreateKeyspace(ctx, te.keyspace, &topodatapb.Keyspace{})
require.NoError(t, err)
err = te.topoServ.CreateShard(ctx, te.keyspace, te.shard)
require.NoError(t, err)
return te
}

func addTablet(te *pickerTestEnv, id int, tabletType topodatapb.TabletType, serving, healthy bool) *topodatapb.Tablet {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: te.cell,
Uid: uint32(id),
},
Keyspace: te.keyspace,
Shard: te.shard,
KeyRange: &topodatapb.KeyRange{},
Type: tabletType,
PortMap: map[string]int32{
"test": int32(id),
},
}
err := te.topoServ.CreateTablet(context.Background(), tablet)
require.NoError(te.t, err)

var herr string
if !healthy {
herr = "err"
}
_ = createFixedHealthConn(tablet, &querypb.StreamHealthResponse{
Serving: serving,
Target: &querypb.Target{
Keyspace: te.keyspace,
Shard: te.shard,
TabletType: tabletType,
},
RealtimeStats: &querypb.RealtimeStats{HealthError: herr},
})

return tablet
}

func deleteTablet(te *pickerTestEnv, tablet *topodatapb.Tablet) {
te.topoServ.DeleteTablet(context.Background(), tablet.Alias)
// This is not automatically removed from shard replication, which results in log spam.
topo.DeleteTabletReplicationData(context.Background(), te.topoServ, tablet)
}
Loading