diff --git a/go/cmd/vttablet/vttablet.go b/go/cmd/vttablet/vttablet.go index 135f3cd3191..d8777ce84f5 100644 --- a/go/cmd/vttablet/vttablet.go +++ b/go/cmd/vttablet/vttablet.go @@ -103,7 +103,7 @@ func main() { DBConfigs: config.DB.Clone(), QueryServiceControl: qsc, UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine()), - VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld), + VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()), } if err := tm.Start(tablet, config.Healthcheck.IntervalSeconds.Get()); err != nil { log.Exitf("failed to parse -tablet-path or initialize DB credentials: %v", err) diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 5a3b9cf0365..5cf1078344e 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -36,15 +36,16 @@ import ( ) var ( - vc *VitessCluster - vtgate *cluster.VtgateProcess - defaultCell *Cell - vtgateConn *mysql.Conn - defaultRdonly int - defaultReplicas int - allCellNames string - httpClient = throttlebase.SetupHTTPClient(time.Second) - throttlerAppName = "vstreamer" + vc *VitessCluster + vtgate *cluster.VtgateProcess + defaultCell *Cell + vtgateConn *mysql.Conn + defaultRdonly int + defaultReplicas int + allCellNames string + httpClient = throttlebase.SetupHTTPClient(time.Second) + sourceThrottlerAppName = "vstreamer" + targetThrottlerAppName = "vreplication" ) func init() { @@ -63,16 +64,16 @@ func throttleResponse(tablet *cluster.VttabletProcess, path string) (resp *http. return resp, respBody, err } -func throttleStreamer(tablet *cluster.VttabletProcess) (*http.Response, string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerAppName)) +func throttleStreamer(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app)) } -func unthrottleStreamer(tablet *cluster.VttabletProcess) (*http.Response, string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerAppName)) +func unthrottleStreamer(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) { + return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app)) } -func throttlerCheckSelf(tablet *cluster.VttabletProcess) (resp *http.Response, respBody string, err error) { - apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerAppName) +func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (resp *http.Response, respBody string, err error) { + apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, app) resp, err = httpClient.Get(apiURL) if err != nil { return resp, respBody, err @@ -107,6 +108,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) { materializeRollup(t) shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName) + validateRollupReplicates(t) shardOrders(t) shardMerchant(t) @@ -221,11 +223,16 @@ func insertMoreProducts(t *testing.T) { execVtgateQuery(t, vtgateConn, "product", sql) } -func insertMoreProductsForThrottler(t *testing.T) { +func insertMoreProductsForSourceThrottler(t *testing.T) { sql := "insert into product(pid, description) values(103, 'new-cpu'),(104, 'new-camera'),(105, 'new-mouse');" execVtgateQuery(t, vtgateConn, "product", sql) } +func insertMoreProductsForTargetThrottler(t *testing.T) { + sql := "insert into product(pid, description) values(203, 'new-cpu'),(204, 'new-camera'),(205, 'new-mouse');" + execVtgateQuery(t, vtgateConn, "product", sql) +} + func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string) { t.Run("shardCustomer", func(t *testing.T) { workflow := "p2c" @@ -241,6 +248,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "80-"), 1); err != nil { t.Fatal(err) } + tables := "customer" moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables) @@ -593,21 +601,28 @@ func materializeProduct(t *testing.T) { } productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "master") - t.Run("throttle-app", func(t *testing.T) { + t.Run("throttle-app-product", func(t *testing.T) { // Now, throttle the streamer on source tablets, insert some rows for _, tab := range productTablets { - _, body, err := throttleStreamer(tab) + _, body, err := throttleStreamer(tab, sourceThrottlerAppName) assert.NoError(t, err) - assert.Contains(t, body, throttlerAppName) + assert.Contains(t, body, sourceThrottlerAppName) } // Wait for throttling to take effect (caching will expire by this time): time.Sleep(1 * time.Second) for _, tab := range productTablets { - _, body, err := throttlerCheckSelf(tab) - assert.NoError(t, err) - assert.Contains(t, body, "417") + { + _, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, "417") + } + { + _, body, err := throttlerCheckSelf(tab, targetThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, "200") + } } - insertMoreProductsForThrottler(t) + insertMoreProductsForSourceThrottler(t) // To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place, time.Sleep(1 * time.Second) // we expect the additional rows to **not appear** in the materialized view @@ -615,19 +630,76 @@ func materializeProduct(t *testing.T) { validateCountInTablet(t, tab, keyspace, workflow, 5) } }) - t.Run("unthrottle-app", func(t *testing.T) { + t.Run("unthrottle-app-product", func(t *testing.T) { // unthrottle on source tablets, and expect the rows to show up for _, tab := range productTablets { - _, body, err := unthrottleStreamer(tab) + _, body, err := unthrottleStreamer(tab, sourceThrottlerAppName) assert.NoError(t, err) - assert.Contains(t, body, throttlerAppName) + assert.Contains(t, body, sourceThrottlerAppName) } // give time for unthrottling to take effect and for target to fetch data time.Sleep(3 * time.Second) + for _, tab := range productTablets { + { + _, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, "200") + } + } for _, tab := range customerTablets { validateCountInTablet(t, tab, keyspace, workflow, 8) } }) + + t.Run("throttle-app-customer", func(t *testing.T) { + // Now, throttle the streamer on source tablets, insert some rows + for _, tab := range customerTablets { + _, body, err := throttleStreamer(tab, targetThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, targetThrottlerAppName) + } + // Wait for throttling to take effect (caching will expire by this time): + time.Sleep(1 * time.Second) + for _, tab := range customerTablets { + { + _, body, err := throttlerCheckSelf(tab, targetThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, "417") + } + { + _, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, "200") + } + } + insertMoreProductsForTargetThrottler(t) + // To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place, + time.Sleep(1 * time.Second) + // we expect the additional rows to **not appear** in the materialized view + for _, tab := range customerTablets { + validateCountInTablet(t, tab, keyspace, workflow, 8) + } + }) + t.Run("unthrottle-app-customer", func(t *testing.T) { + // unthrottle on source tablets, and expect the rows to show up + for _, tab := range customerTablets { + _, body, err := unthrottleStreamer(tab, targetThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, targetThrottlerAppName) + } + // give time for unthrottling to take effect and for target to fetch data + time.Sleep(3 * time.Second) + for _, tab := range customerTablets { + { + _, body, err := throttlerCheckSelf(tab, targetThrottlerAppName) + assert.NoError(t, err) + assert.Contains(t, body, "200") + } + } + for _, tab := range customerTablets { + validateCountInTablet(t, tab, keyspace, workflow, 11) + } + }) }) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine.go b/go/vt/vttablet/tabletmanager/vreplication/engine.go index 406b49e8e55..2c7939b705b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/engine.go +++ b/go/vt/vttablet/tabletmanager/vreplication/engine.go @@ -17,6 +17,7 @@ limitations under the License. package vreplication import ( + "context" "errors" "flag" "fmt" @@ -24,24 +25,22 @@ import ( "sync" "time" - "vitess.io/vitess/go/sync2" - "vitess.io/vitess/go/vt/dbconfigs" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vtgate/evalengine" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/withddl" - - "context" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/withddl" ) const ( @@ -65,6 +64,10 @@ const ( var withDDL *withddl.WithDDL +const ( + throttlerAppName = "vreplication" +) + func init() { allddls := append([]string{}, binlogplayer.CreateVReplicationTable()...) allddls = append(allddls, binlogplayer.AlterVReplicationTable...) @@ -111,6 +114,8 @@ type Engine struct { journaler map[string]*journalEvent ec *externalConnector + + throttlerClient *throttle.Client } type journalEvent struct { @@ -121,14 +126,15 @@ type journalEvent struct { // NewEngine creates a new Engine. // A nil ts means that the Engine is disabled. -func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon) *Engine { +func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, lagThrottler *throttle.Throttler) *Engine { vre := &Engine{ - controllers: make(map[int]*controller), - ts: ts, - cell: cell, - mysqld: mysqld, - journaler: make(map[string]*journalEvent), - ec: newExternalConnector(config.ExternalConnections), + controllers: make(map[int]*controller), + ts: ts, + cell: cell, + mysqld: mysqld, + journaler: make(map[string]*journalEvent), + ec: newExternalConnector(config.ExternalConnections), + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerAppName, throttle.ThrottleCheckSelf), } return vre } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index 43272b83cb2..69e220186f4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -221,11 +221,18 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma var updateCopyState *sqlparser.ParsedQuery var bv map[string]*querypb.BindVariable err = vc.vr.sourceVStreamer.VStreamRows(ctx, initialPlan.SendRule.Filter, lastpkpb, func(rows *binlogdatapb.VStreamRowsResponse) error { - select { - case <-ctx.Done(): - return io.EOF - default: + for { + select { + case <-ctx.Done(): + return io.EOF + default: + } + // verify throttler is happy, otherwise keep looping + if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWait(ctx) { + break + } } + if vc.tablePlan == nil { if len(rows.Fields) == 0 { return fmt.Errorf("expecting field event first, got: %v", rows) @@ -249,6 +256,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma if len(rows.Rows) == 0 { return nil } + // The number of rows we receive depends on the packet size set // for the row streamer. Since the packet size is roughly equivalent // to data size, this should map to a uniform amount of pages affected diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 0a201d5fccb..444676fe668 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -318,6 +318,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { defer vp.vr.stats.SecondsBehindMaster.Set(math.MaxInt64) var sbm int64 = -1 for { + // check throttler. + if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWait(ctx) { + continue + } + items, err := relay.Fetch() if err != nil { return err diff --git a/go/vt/vttablet/tabletserver/gc/tablegc.go b/go/vt/vttablet/tabletserver/gc/tablegc.go index 36716ae4cb0..d66fc9912ce 100644 --- a/go/vt/vttablet/tabletserver/gc/tablegc.go +++ b/go/vt/vttablet/tabletserver/gc/tablegc.go @@ -22,7 +22,6 @@ import ( "fmt" "math" "math/rand" - "net/http" "sort" "sync" "sync/atomic" @@ -43,9 +42,6 @@ import ( const ( leaderCheckInterval = 5 * time.Second purgeReentranceInterval = 1 * time.Minute - // throttleCheckDuration controls both how frequently the throttler is checked. as well as - // how long to sleep if throttler blocks us - throttleCheckDuration = 250 * time.Millisecond // evacHours is a hard coded, reasonable time for a table to spend in EVAC state evacHours = 72 throttlerAppName = "tablegc" @@ -58,12 +54,9 @@ var checkInterval = flag.Duration("gc_check_interval", 1*time.Hour, "Interval be var gcLifecycle = flag.String("table_gc_lifecycle", "hold,purge,evac,drop", "States for a DROP TABLE garbage collection cycle. Default is 'hold,purge,evac,drop', use any subset ('drop' implcitly always included)") var ( - sqlPurgeTable = `delete from %a limit 50` - sqlShowVtTables = `show tables like '\_vt\_%'` - sqlDropTable = "drop table if exists `%a`" - throttleFlags = &throttle.CheckFlags{ - LowPriority: true, - } + sqlPurgeTable = `delete from %a limit 50` + sqlShowVtTables = `show tables like '\_vt\_%'` + sqlDropTable = "drop table if exists `%a`" purgeReentranceFlag int64 ) @@ -91,17 +84,16 @@ type TableGC struct { shard string dbName string - lagThrottler *throttle.Throttler - isPrimary int64 - isOpen int64 + isPrimary int64 + isOpen int64 + + throttlerClient *throttle.Client env tabletenv.Env pool *connpool.Pool tabletTypeFunc func() topodatapb.TabletType ts *topo.Server - lastSuccessfulThrottleCheck time.Time - initMutex sync.Mutex purgeMutex sync.Mutex @@ -130,9 +122,9 @@ type GCStatus struct { // NewTableGC creates a table collector func NewTableGC(env tabletenv.Env, ts *topo.Server, tabletTypeFunc func() topodatapb.TabletType, lagThrottler *throttle.Throttler) *TableGC { collector := &TableGC{ - lagThrottler: lagThrottler, - isPrimary: 0, - isOpen: 0, + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerAppName, throttle.ThrottleCheckSelf), + isPrimary: 0, + isOpen: 0, env: env, tabletTypeFunc: tabletTypeFunc, @@ -408,21 +400,6 @@ func (collector *TableGC) checkTables(ctx context.Context) error { return nil } -func (collector *TableGC) throttleStatusOK(ctx context.Context) bool { - if time.Since(collector.lastSuccessfulThrottleCheck) <= throttleCheckDuration { - // if last check was OK just very recently there is no need to check again - return true - } - // It's time to run a throttler check - checkResult := collector.lagThrottler.Check(ctx, throttlerAppName, "", throttleFlags) - if checkResult.StatusCode != http.StatusOK { - // sorry, we got throttled. - return false - } - collector.lastSuccessfulThrottleCheck = time.Now() - return true -} - // purge continuously purges rows from a table. // This function is non-reentrant: there's only one instance of this function running at any given time. // A timer keeps calling this function, so if it bails out (e.g. on error) it will later resume work @@ -466,9 +443,8 @@ func (collector *TableGC) purge(ctx context.Context) (tableName string, err erro log.Infof("TableGC: purge begin for %s", tableName) for { - for !collector.throttleStatusOK(ctx) { - // Sorry, got throttled. Sleep some time, then check again - time.Sleep(throttleCheckDuration) + if !collector.throttlerClient.ThrottleCheckOKOrWait(ctx) { + continue } // OK, we're clear to go! diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 2bc200d53de..aa4c8929d71 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1579,7 +1579,7 @@ func (tsv *TabletServer) registerMigrationStatusHandler() { // registerThrottlerCheckHandlers registers throttler "check" requests func (tsv *TabletServer) registerThrottlerCheckHandlers() { - handle := func(path string, checkResultFunc func(ctx context.Context, appName string, remoteAddr string, flags *throttle.CheckFlags) *throttle.CheckResult) { + handle := func(path string, checkType throttle.ThrottleCheckType) { tsv.exporter.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { ctx := tabletenv.LocalContext() remoteAddr := r.Header.Get("X-Forwarded-For") @@ -1594,7 +1594,7 @@ func (tsv *TabletServer) registerThrottlerCheckHandlers() { flags := &throttle.CheckFlags{ LowPriority: (r.URL.Query().Get("p") == "low"), } - checkResult := checkResultFunc(ctx, appName, remoteAddr, flags) + checkResult := tsv.lagThrottler.CheckByType(ctx, appName, remoteAddr, flags, checkType) if checkResult.StatusCode == http.StatusNotFound && flags.OKIfNotExists { checkResult.StatusCode = http.StatusOK // 200 } @@ -1608,8 +1608,8 @@ func (tsv *TabletServer) registerThrottlerCheckHandlers() { } }) } - handle("/throttler/check", tsv.lagThrottler.Check) - handle("/throttler/check-self", tsv.lagThrottler.CheckSelf) + handle("/throttler/check", throttle.ThrottleCheckPrimaryWrite) + handle("/throttler/check-self", throttle.ThrottleCheckSelf) } // registerThrottlerStatusHandler registers a throttler "status" request diff --git a/go/vt/vttablet/tabletserver/throttle/client.go b/go/vt/vttablet/tabletserver/throttle/client.go new file mode 100644 index 00000000000..76f3ab1981f --- /dev/null +++ b/go/vt/vttablet/tabletserver/throttle/client.go @@ -0,0 +1,117 @@ +/* +Copyright 2021 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package throttle + +import ( + "context" + "net/http" + "time" +) + +const ( + throttleCheckDuration = 250 * time.Millisecond +) + +// Client construct is used by apps who wish to consult with a throttler. It encapsulates the check/throttling/backoff logic +type Client struct { + throttler *Throttler + appName string + checkType ThrottleCheckType + flags CheckFlags + + lastSuccessfulThrottleCheck time.Time +} + +// NewProductionClient creates a client suitable for foreground/production jobs, which have normal priority. +func NewProductionClient(throttler *Throttler, appName string, checkType ThrottleCheckType) *Client { + return &Client{ + throttler: throttler, + appName: appName, + checkType: checkType, + flags: CheckFlags{ + LowPriority: false, + }, + } +} + +// NewBackgroundClient creates a client suitable for background jobs, which have low priority over productio ntraffic, +// e.g. migration, table pruning, vreplication +func NewBackgroundClient(throttler *Throttler, appName string, checkType ThrottleCheckType) *Client { + return &Client{ + throttler: throttler, + appName: appName, + checkType: checkType, + flags: CheckFlags{ + LowPriority: true, + }, + } +} + +// ThrottleCheckOK checks the throttler, and returns 'true' when the throttler is satisfied. +// It does not sleep. +// The function caches results for a brief amount of time, hence it's safe and efficient to +// be called very frequenty. +// The function is not thread safe. +func (c *Client) ThrottleCheckOK(ctx context.Context) (throttleCheckOK bool) { + if c == nil { + // no client + return true + } + if c.throttler == nil { + // no throttler + return true + } + if time.Since(c.lastSuccessfulThrottleCheck) <= throttleCheckDuration { + // if last check was OK just very recently there is no need to check again + return true + } + // It's time to run a throttler check + checkResult := c.throttler.CheckByType(ctx, c.appName, "", &c.flags, c.checkType) + if checkResult.StatusCode != http.StatusOK { + return false + } + c.lastSuccessfulThrottleCheck = time.Now() + return true + +} + +// ThrottleCheckOKOrWait checks the throttler; if throttler is satisfied, the function returns 'true' mmediately, +// otherwise it briefly sleeps and returns 'false'. +// The function is not thread safe. +func (c *Client) ThrottleCheckOKOrWait(ctx context.Context) bool { + ok := c.ThrottleCheckOK(ctx) + if !ok { + time.Sleep(throttleCheckDuration) + } + return ok +} + +// Throttle throttles until the throttler is satisfied, or until context is cancelled. +// The function sleeps between throttle checks. +// The function is not thread safe. +func (c *Client) Throttle(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + default: + } + if c.ThrottleCheckOKOrWait(ctx) { + break + } + } +} diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index 7e5a8d3fcdb..787f3a93714 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -75,6 +75,13 @@ var ( replicationLagQuery = `select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from _vt.heartbeat` ) +type ThrottleCheckType int + +const ( + ThrottleCheckPrimaryWrite ThrottleCheckType = iota + ThrottleCheckSelf +) + func init() { rand.Seed(time.Now().UnixNano()) } @@ -792,6 +799,16 @@ func (throttler *Throttler) CheckSelf(ctx context.Context, appName string, remot return throttler.checkStore(ctx, appName, selfStoreName, remoteAddr, flags) } +// CheckSelf is checks the mysql/self metric, and is available on each tablet +func (throttler *Throttler) CheckByType(ctx context.Context, appName string, remoteAddr string, flags *CheckFlags, checkType ThrottleCheckType) (checkResult *CheckResult) { + switch checkType { + case ThrottleCheckSelf: + return throttler.CheckSelf(ctx, appName, remoteAddr, flags) + default: + return throttler.Check(ctx, appName, remoteAddr, flags) + } +} + // Status exports a status breakdown func (throttler *Throttler) Status() *ThrottlerStatus { return &ThrottlerStatus{ diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 32f1ce6f64d..8485e16858e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -23,7 +23,6 @@ import ( "errors" "net/http" "sync" - "time" "vitess.io/vitess/go/vt/servenv" @@ -43,14 +42,7 @@ import ( ) const ( - throttleCheckDuration = 250 * time.Millisecond - throttlerAppName = "vstreamer" -) - -var ( - throttleFlags = &throttle.CheckFlags{ - LowPriority: true, - } + throttlerAppName = "vstreamer" ) // Engine is the engine for handling vreplication streaming requests. @@ -98,8 +90,7 @@ type Engine struct { vstreamersCreated *stats.Counter vstreamersEndedWithErrors *stats.Counter - lagThrottler *throttle.Throttler - lastSuccessfulThrottleCheck time.Time + throttlerClient *throttle.Client } // NewEngine creates a new Engine. @@ -107,11 +98,11 @@ type Engine struct { // Open and Close can be called multiple times and are idempotent. func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine, lagThrottler *throttle.Throttler, cell string) *Engine { vse := &Engine{ - env: env, - ts: ts, - se: se, - cell: cell, - lagThrottler: lagThrottler, + env: env, + ts: ts, + se: se, + cell: cell, + throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerAppName, throttle.ThrottleCheckSelf), streamers: make(map[int]*uvstreamer), rowStreamers: make(map[int]*rowStreamer), @@ -194,37 +185,6 @@ func (vse *Engine) vschema() *vindexes.VSchema { return vse.lvschema.vschema } -func (vse *Engine) throttleStatusOK(ctx context.Context, sleep bool) bool { - if vse.lagThrottler == nil { - // no throttler - return true - } - if time.Since(vse.lastSuccessfulThrottleCheck) <= throttleCheckDuration { - // if last check was OK just very recently there is no need to check again - return true - } - // It's time to run a throttler check - checkResult := vse.lagThrottler.CheckSelf(ctx, throttlerAppName, "", throttleFlags) - if checkResult.StatusCode != http.StatusOK { - // sorry, we got throttled. - if sleep { - time.Sleep(throttleCheckDuration) - } - return false - } - vse.lastSuccessfulThrottleCheck = time.Now() - return true -} - -// throttle will wait until the throttler's "check-self" check is satisfied -func (vse *Engine) throttle(ctx context.Context) { - // We introduce throttling based on the tablet's "self" check, which means if the tablet itself is lagging, - // we hold off reads so as to ease the load and let it regain its health - for !vse.throttleStatusOK(ctx, true) { - // Sorry, got throttled. Sleep some time, then check again - } -} - // Stream starts a new stream. // This streams events from the binary logs func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go index d47539e759d..c7897b756ca 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer.go @@ -97,8 +97,8 @@ func (rs *resultStreamer) Stream() error { default: } - // check throttler. If required throttling, sleep ("true" argument) and retry loop - if !rs.vse.throttleStatusOK(rs.ctx, true) { + // check throttler. + if !rs.vse.throttlerClient.ThrottleCheckOKOrWait(rs.ctx) { continue } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index 8245b174c53..95863d1384f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -244,8 +244,8 @@ func (rs *rowStreamer) streamQuery(conn *snapshotConn, send func(*binlogdatapb.V default: } - // check throttler. If required throttling, sleep ("true" argument) and retry loop - if !rs.vse.throttleStatusOK(rs.ctx, true) { + // check throttler. + if !rs.vse.throttlerClient.ThrottleCheckOKOrWait(rs.ctx) { continue } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index d2d4ecce43c..0e7b27b1f14 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -276,8 +276,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog throttledEvents := make(chan mysql.BinlogEvent) go func() { for { - // check throttler. If required throttling, sleep ("true" argument) and retry loop - if !vs.vse.throttleStatusOK(ctx, true) { + // check throttler. + if !vs.vse.throttlerClient.ThrottleCheckOKOrWait(ctx) { continue }