Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: throttle on target tablet #7364

Merged
merged 19 commits into from
Jan 27, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func main() {
DBConfigs: config.DB.Clone(),
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine()),
VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld),
VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()),
}
if err := tm.Start(tablet, config.Healthcheck.IntervalSeconds.Get()); err != nil {
log.Exitf("failed to parse -tablet-path or initialize DB credentials: %v", err)
Expand Down
124 changes: 98 additions & 26 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ import (
)

var (
vc *VitessCluster
vtgate *cluster.VtgateProcess
defaultCell *Cell
vtgateConn *mysql.Conn
defaultRdonly int
defaultReplicas int
allCellNames string
httpClient = throttlebase.SetupHTTPClient(time.Second)
throttlerAppName = "vstreamer"
vc *VitessCluster
vtgate *cluster.VtgateProcess
defaultCell *Cell
vtgateConn *mysql.Conn
defaultRdonly int
defaultReplicas int
allCellNames string
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = "vstreamer"
targetThrottlerAppName = "vreplication"
)

func init() {
Expand All @@ -63,16 +64,16 @@ func throttleResponse(tablet *cluster.VttabletProcess, path string) (resp *http.
return resp, respBody, err
}

func throttleStreamer(tablet *cluster.VttabletProcess) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerAppName))
func throttleStreamer(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app))
}

func unthrottleStreamer(tablet *cluster.VttabletProcess) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerAppName))
func unthrottleStreamer(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app))
}

func throttlerCheckSelf(tablet *cluster.VttabletProcess) (resp *http.Response, respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerAppName)
func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (resp *http.Response, respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, app)
resp, err = httpClient.Get(apiURL)
if err != nil {
return resp, respBody, err
Expand Down Expand Up @@ -107,6 +108,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) {
materializeRollup(t)

shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName)

validateRollupReplicates(t)
shardOrders(t)
shardMerchant(t)
Expand Down Expand Up @@ -221,11 +223,16 @@ func insertMoreProducts(t *testing.T) {
execVtgateQuery(t, vtgateConn, "product", sql)
}

func insertMoreProductsForThrottler(t *testing.T) {
func insertMoreProductsForSourceThrottler(t *testing.T) {
sql := "insert into product(pid, description) values(103, 'new-cpu'),(104, 'new-camera'),(105, 'new-mouse');"
execVtgateQuery(t, vtgateConn, "product", sql)
}

func insertMoreProductsForTargetThrottler(t *testing.T) {
sql := "insert into product(pid, description) values(203, 'new-cpu'),(204, 'new-camera'),(205, 'new-mouse');"
execVtgateQuery(t, vtgateConn, "product", sql)
}

func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string) {
t.Run("shardCustomer", func(t *testing.T) {
workflow := "p2c"
Expand All @@ -241,6 +248,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "80-"), 1); err != nil {
t.Fatal(err)
}

tables := "customer"
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)

Expand Down Expand Up @@ -593,41 +601,105 @@ func materializeProduct(t *testing.T) {
}

productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "master")
t.Run("throttle-app", func(t *testing.T) {
t.Run("throttle-app: product (source)", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
for _, tab := range productTablets {
_, body, err := throttleStreamer(tab)
_, body, err := throttleStreamer(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
assert.Contains(t, body, sourceThrottlerAppName)
}
// Wait for throttling to take effect (caching will expire by this time):
time.Sleep(1 * time.Second)
for _, tab := range productTablets {
_, body, err := throttlerCheckSelf(tab)
assert.NoError(t, err)
assert.Contains(t, body, "417")
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "417")
}
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
insertMoreProductsForThrottler(t)
insertMoreProductsForSourceThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
time.Sleep(1 * time.Second)
// we expect the additional rows to **not appear** in the materialized view
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 5)
}
})
t.Run("unthrottle-app", func(t *testing.T) {
t.Run("unthrottle-app: product (source)", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
for _, tab := range productTablets {
_, body, err := unthrottleStreamer(tab)
_, body, err := unthrottleStreamer(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
assert.Contains(t, body, sourceThrottlerAppName)
}
// give time for unthrottling to take effect and for target to fetch data
time.Sleep(3 * time.Second)
for _, tab := range productTablets {
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 8)
}
})

t.Run("throttle-app: customer (target)", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
for _, tab := range customerTablets {
_, body, err := throttleStreamer(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
}
// Wait for throttling to take effect (caching will expire by this time):
time.Sleep(1 * time.Second)
for _, tab := range customerTablets {
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "417")
}
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
insertMoreProductsForTargetThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
time.Sleep(1 * time.Second)
// we expect the additional rows to **not appear** in the materialized view
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 8)
}
})
t.Run("unthrottle-app: customer (target)", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
for _, tab := range customerTablets {
_, body, err := unthrottleStreamer(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
}
// give time for unthrottling to take effect and for target to fetch data
time.Sleep(3 * time.Second)
for _, tab := range customerTablets {
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 11)
}
})
})
}

Expand Down
80 changes: 64 additions & 16 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,31 @@ limitations under the License.
package vreplication

import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"sort"
"sync"
"time"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/withddl"

"context"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/withddl"
)

const (
Expand All @@ -65,6 +65,17 @@ const (

var withDDL *withddl.WithDDL

const (
throttleCheckDuration = 250 * time.Millisecond
throttlerAppName = "vreplication"
)

var (
throttleFlags = &throttle.CheckFlags{
LowPriority: true,
}
)

func init() {
allddls := append([]string{}, binlogplayer.CreateVReplicationTable()...)
allddls = append(allddls, binlogplayer.AlterVReplicationTable...)
Expand Down Expand Up @@ -111,6 +122,9 @@ type Engine struct {

journaler map[string]*journalEvent
ec *externalConnector

lagThrottler *throttle.Throttler
lastSuccessfulThrottleCheck time.Time
}

type journalEvent struct {
Expand All @@ -121,14 +135,15 @@ type journalEvent struct {

// NewEngine creates a new Engine.
// A nil ts means that the Engine is disabled.
func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon) *Engine {
func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, lagThrottler *throttle.Throttler) *Engine {
vre := &Engine{
controllers: make(map[int]*controller),
ts: ts,
cell: cell,
mysqld: mysqld,
journaler: make(map[string]*journalEvent),
ec: newExternalConnector(config.ExternalConnections),
controllers: make(map[int]*controller),
ts: ts,
cell: cell,
mysqld: mysqld,
journaler: make(map[string]*journalEvent),
ec: newExternalConnector(config.ExternalConnections),
lagThrottler: lagThrottler,
}
return vre
}
Expand Down Expand Up @@ -288,6 +303,39 @@ func (vre *Engine) Close() {
log.Infof("VReplication Engine: closed")
}

// throttleStatusOK checks if the throttler is happy with a Check (a shard-scope check).
// If not, and `sleep == true`, impose some sleep.
func (vre *Engine) throttleStatusOK(ctx context.Context, sleep bool) bool {
if vre.lagThrottler == nil {
// no throttler
return true
}
if time.Since(vre.lastSuccessfulThrottleCheck) <= throttleCheckDuration {
// if last check was OK just very recently there is no need to check again
return true
}
// It's time to run a throttler check
checkResult := vre.lagThrottler.Check(ctx, throttlerAppName, "", throttleFlags)
if checkResult.StatusCode != http.StatusOK {
// sorry, we got throttled.
if sleep {
time.Sleep(throttleCheckDuration)
}
return false
}
vre.lastSuccessfulThrottleCheck = time.Now()
return true
}

// throttle will wait until the throttler's "check-self" check is satisfied
func (vre *Engine) throttle(ctx context.Context) {
// We introduce throttling based on the tablet's "self" check, which means if the tablet itself is lagging,
// we hold off reads so as to ease the load and let it regain its health
for !vre.throttleStatusOK(ctx, true) {
// Sorry, got throttled. Sleep some time, then check again
}
}

// Exec executes the query and the related actions.
// Example insert statement:
// insert into _vt.vreplication
Expand Down
16 changes: 12 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,18 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
var updateCopyState *sqlparser.ParsedQuery
var bv map[string]*querypb.BindVariable
err = vc.vr.sourceVStreamer.VStreamRows(ctx, initialPlan.SendRule.Filter, lastpkpb, func(rows *binlogdatapb.VStreamRowsResponse) error {
select {
case <-ctx.Done():
return io.EOF
default:
for {
select {
case <-ctx.Done():
return io.EOF
default:
}
// verify throttler is happy, otherwise keep looping
if vc.vr.vre.throttleStatusOK(ctx, true) {
break
}
}

if vc.tablePlan == nil {
if len(rows.Fields) == 0 {
return fmt.Errorf("expecting field event first, got: %v", rows)
Expand All @@ -249,6 +256,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
if len(rows.Rows) == 0 {
return nil
}

// The number of rows we receive depends on the packet size set
// for the row streamer. Since the packet size is roughly equivalent
// to data size, this should map to a uniform amount of pages affected
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
defer vp.vr.stats.SecondsBehindMaster.Set(math.MaxInt64)
var sbm int64 = -1
for {
// check throttler. If required throttling, sleep ("true" argument) and retry loop
if !vp.vr.vre.throttleStatusOK(ctx, true) {
continue
}

items, err := relay.Fetch()
if err != nil {
return err
Expand Down