Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vttablet throttling #6668

Merged
merged 36 commits into from
Oct 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
6b5617c
enable and set heartbeat in examples' vttablet-up.sh
shlomi-noach Sep 3, 2020
afb62af
majority of freno import
shlomi-noach Sep 3, 2020
63d29f7
using a lagThrottler from within tabletserver/tm_state
shlomi-noach Sep 3, 2020
4652408
Correct usage of Open/Close and leadership check. Kicking Operate().
shlomi-noach Sep 3, 2020
2ba4afc
fixed heartbeat_enable flag name
shlomi-noach Sep 3, 2020
fc01e9c
/throttler/status endpoint
shlomi-noach Sep 3, 2020
9d2882d
Pretty much complete logic, supporting reparenting and rotating accounts
shlomi-noach Sep 6, 2020
01cabe4
Pretty much complete logic, supporting reparenting and rotating accounts
shlomi-noach Sep 6, 2020
8213c8e
fix tabletserver unit test
shlomi-noach Sep 6, 2020
155124f
double check read_only state
shlomi-noach Sep 6, 2020
6573d44
Added endtoend throttler_test
shlomi-noach Sep 6, 2020
b5958e7
test no hosts, test restoring hosts
shlomi-noach Sep 6, 2020
1e4c01a
test the throttler\?
shlomi-noach Sep 6, 2020
c9807c8
enable heartbeat; test replication lag, test no hosts
shlomi-noach Sep 6, 2020
0dec65d
heartbeat interval for example apps: 250ms
shlomi-noach Sep 6, 2020
0813733
introducing -throttle_threshold tablet command line flag
shlomi-noach Sep 9, 2020
9ec3ad5
Merge branch 'master' into vttablet-throttle
shlomi-noach Sep 13, 2020
e39c8ed
copyright 2020
shlomi-noach Sep 13, 2020
b51b8f9
license in every file
shlomi-noach Sep 13, 2020
053f3e3
copyright 2020
shlomi-noach Sep 13, 2020
2f5ce5d
Adding freno's original license by GitHub
shlomi-noach Sep 17, 2020
e13ae74
Operate() starts on every Open() call, temrinates on any Close() call
shlomi-noach Sep 22, 2020
bbde649
Adding SuspendableTicker
shlomi-noach Sep 22, 2020
c3dfcab
changes Open()/Close() logic, using suspendable timers. Also, overcom…
shlomi-noach Sep 23, 2020
b9d4314
fixed state_manager_test
shlomi-noach Sep 23, 2020
4c050ef
Merge remote-tracking branch 'upstream/master' into vttablet-throttle
shlomi-noach Sep 24, 2020
992c38f
HTTP 200 OK when no replicas exist
shlomi-noach Sep 24, 2020
3669283
-throttle_tablet_types command line flag
shlomi-noach Sep 29, 2020
4b0a5a3
throttle according to throttle_tablet_types
shlomi-noach Sep 29, 2020
48e62e3
fixes per review
shlomi-noach Sep 29, 2020
62d2d82
fixes per review
shlomi-noach Sep 29, 2020
7f1de38
reorder throttler.Close()
shlomi-noach Sep 30, 2020
567271b
adapt state_manager test
shlomi-noach Sep 30, 2020
4d09259
throttler implicitly enables heartbeat
shlomi-noach Oct 1, 2020
047352a
removed DefaultMySQLPort
shlomi-noach Oct 1, 2020
60a4082
enable/disable heartbeat; fix tests
shlomi-noach Oct 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/mini/vttablet-mini-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ vttablet \
-init_tablet_type $tablet_type \
-health_check_interval 5s \
-heartbeat_enable \
-heartbeat_interval 500ms \
-heartbeat_interval 250ms \
-enable_semi_sync \
-enable_replication_reporter \
-backup_storage_implementation file \
Expand Down
2 changes: 2 additions & 0 deletions examples/compose/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ exec $VTROOT/bin/vttablet \
-health_check_interval 5s \
-enable_semi_sync \
-enable_replication_reporter \
-heartbeat_enable \
-heartbeat_interval 250ms \
-port $web_port \
-grpc_port $grpc_port \
-binlog_use_v3_resharding_mode=true \
Expand Down
2 changes: 2 additions & 0 deletions examples/local/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ vttablet \
-health_check_interval 5s \
-enable_semi_sync \
-enable_replication_reporter \
-heartbeat_enable \
-heartbeat_interval 250ms \
-backup_storage_implementation file \
-file_backup_storage_root $VTDATAROOT/backups \
-restore_from_backup \
Expand Down
197 changes: 197 additions & 0 deletions go/test/endtoend/tabletmanager/throttler/throttler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
Copyright 2020 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package master

import (
"flag"
"fmt"
"net/http"
"os"
"testing"
"time"

"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"

"vitess.io/vitess/go/test/endtoend/cluster"

"github.com/stretchr/testify/assert"
)

var (
clusterInstance *cluster.LocalProcessCluster
masterTablet cluster.Vttablet
replicaTablet cluster.Vttablet
hostname = "localhost"
keyspaceName = "ks"
cell = "zone1"
sqlSchema = `
create table t1(
id bigint,
value varchar(16),
primary key(id)
) Engine=InnoDB;
`

vSchema = `
{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
"t1": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
}
}
}`

httpClient = base.SetupHTTPClient(time.Second)
checkAPIPath = "throttler/check"
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, hostname)
defer clusterInstance.Teardown()

// Start topo server
err := clusterInstance.StartTopo()
if err != nil {
return 1
}

// Set extra tablet args for lock timeout
clusterInstance.VtTabletExtraArgs = []string{
"-lock_tables_timeout", "5s",
"-watch_replication_stream",
"-enable_replication_reporter",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
}
// We do not need semiSync for this test case.
clusterInstance.EnableSemiSync = false

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: sqlSchema,
VSchema: vSchema,
}

if err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false); err != nil {
return 1
}

// Collect table paths and ports
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
for _, tablet := range tablets {
if tablet.Type == "master" {
masterTablet = *tablet
} else if tablet.Type != "rdonly" {
replicaTablet = *tablet
}
}

return m.Run()
}()
os.Exit(exitCode)
}

func throttleCheck() (*http.Response, error) {
return httpClient.Head(fmt.Sprintf("http://localhost:%d/%s", masterTablet.HTTPPort, checkAPIPath))
}

func TestThrottlerBeforeMetricsCollected(t *testing.T) {
defer cluster.PanicHandler(t)

// Immediately after startup, we expect this response:
// {"StatusCode":404,"Value":0,"Threshold":0,"Message":"No such metric"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
}

func TestThrottlerAfterMetricsCollected(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test, and the next one, is doing fair amount of time.Sleep. Is there no way to speed these tests up a bit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the way to speed this up is to make the throttler's roster polling interval smaller, and to implement that by adding a new configuration variable. However, does it make sense to only add this configuration variable for purposes of CI tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense for users to configure the polling interval?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my answer to @harshit-gangal : #6668 (comment)

Based on experience, these are good numbers in production, and the risk for letting the user change these is that they'll use too small intervals (server too busy, and also meaningless since we have some error margin due to heartbeat injection), or too relaxed intervals (computation not reliable enough, causing high lags). I'm in the opinion of being opinionated to a reasonable degree, and feel comfortable with these values.

defer cluster.PanicHandler(t)

time.Sleep(10 * time.Second)
// By this time metrics will have been collected. We expect no lag, and something like:
// {"StatusCode":200,"Value":0.282278,"Threshold":1,"Message":""}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}

func TestLag(t *testing.T) {
defer cluster.PanicHandler(t)

{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StopReplication", replicaTablet.Alias)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
// Lag will have accumulated
// {"StatusCode":429,"Value":4.864921,"Threshold":1,"Message":"Threshold exceeded"}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusTooManyRequests, resp.StatusCode)
}
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias)
assert.NoError(t, err)

time.Sleep(5 * time.Second)
// Restore
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}

func TestNoReplicas(t *testing.T) {
defer cluster.PanicHandler(t)
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "RDONLY")
assert.NoError(t, err)

time.Sleep(10 * time.Second)
// This makes no REPLICA servers available. We expect something like:
// {"StatusCode":200,"Value":0,"Threshold":1,"Message":""}
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
{
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ChangeTabletType", replicaTablet.Alias, "REPLICA")
assert.NoError(t, err)

time.Sleep(10 * time.Second)
// Restore valid replica
resp, err := throttleCheck()
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
}
}
2 changes: 1 addition & 1 deletion go/vt/throttler/throttler_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2019 The Vitess Authors.
Copyright 2020 The Vitess Authors.
deepthi marked this conversation as resolved.
Show resolved Hide resolved

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func TestSchemaVersioning(t *testing.T) {
tsv := framework.Server
tsv.EnableHistorian(false)
tsv.SetTracking(false)
tsv.EnableHeartbeat(false)
defer tsv.EnableHeartbeat(true)
defer tsv.EnableHistorian(true)
defer tsv.SetTracking(true)

Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,9 @@ func (rt *ReplTracker) Status() (time.Duration, error) {
// rt.mode == tabletenv.Poller
return rt.poller.Status()
}

// EnableHeartbeat enables or disables writes of heartbeat. This functionality
// is only used by tests.
func (rt *ReplTracker) EnableHeartbeat(enable bool) {
rt.hw.enableWrites(enable)
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestReplTracker(t *testing.T) {
rt.InitDBConfig(target, mysqld)
assert.Equal(t, tabletenv.Polling, rt.mode)
assert.Equal(t, mysqld, rt.poller.mysqld)
assert.False(t, rt.hw.enabled)
assert.True(t, rt.hw.enabled)
assert.False(t, rt.hr.enabled)

rt.MakeNonMaster()
Expand Down
16 changes: 11 additions & 5 deletions go/vt/vttablet/tabletserver/repltracker/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ type heartbeatWriter struct {
// newHeartbeatWriter creates a new heartbeatWriter.
func newHeartbeatWriter(env tabletenv.Env, alias topodatapb.TabletAlias) *heartbeatWriter {
config := env.Config()
if config.ReplicationTracker.Mode != tabletenv.Heartbeat {
return &heartbeatWriter{}
}
heartbeatInterval := config.ReplicationTracker.HeartbeatIntervalSeconds.Get()
return &heartbeatWriter{
env: env,
Expand Down Expand Up @@ -111,7 +108,7 @@ func (w *heartbeatWriter) Open() {
log.Info("Hearbeat Writer: opening")

w.pool.Open(w.env.Config().DB.AppWithDB(), w.env.Config().DB.DbaWithDB(), w.env.Config().DB.AppDebugWithDB())
w.ticks.Start(w.writeHeartbeat)
w.enableWrites(true)
w.isOpen = true
}

Expand All @@ -126,7 +123,7 @@ func (w *heartbeatWriter) Close() {
return
}

w.ticks.Stop()
w.enableWrites(false)
w.pool.Close()
w.isOpen = false
log.Info("Hearbeat Writer: closed")
Expand Down Expand Up @@ -182,3 +179,12 @@ func (w *heartbeatWriter) recordError(err error) {
w.errorLog.Errorf("%v", err)
writeErrors.Add(1)
}

// enableWrites actives or deactives heartbeat writes
func (w *heartbeatWriter) enableWrites(enable bool) {
if enable {
w.ticks.Start(w.writeHeartbeat)
} else {
w.ticks.Stop()
}
}
9 changes: 9 additions & 0 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type stateManager struct {
txThrottler txThrottler
te txEngine
messager subComponent
throttler lagThrottler

// hcticks starts on initialiazation and runs forever.
hcticks *timer.Timer
Expand Down Expand Up @@ -155,6 +156,11 @@ type (
Open() error
Close()
}

lagThrottler interface {
Open() error
Close()
}
)

// Init performs the second phase of initialization.
Expand Down Expand Up @@ -403,6 +409,7 @@ func (sm *stateManager) serveMaster() error {
return err
}
sm.messager.Open()
sm.throttler.Open()
sm.setState(topodatapb.TabletType_MASTER, StateServing)
return nil
}
Expand All @@ -422,6 +429,7 @@ func (sm *stateManager) unserveMaster() error {
}

func (sm *stateManager) serveNonMaster(wantTabletType topodatapb.TabletType) error {
sm.throttler.Close()
sm.messager.Close()
sm.tracker.Close()
sm.se.MakeNonMaster()
Expand Down Expand Up @@ -469,6 +477,7 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error {
}

func (sm *stateManager) unserveCommon() {
sm.throttler.Close()
sm.messager.Close()
sm.te.Close()
sm.qe.StopServing()
Expand Down
Loading