Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: throttled app configuration via vtctl UpdateThrottlerConfig #13351

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
66d2c0d
Adding ThrottledAppRule in ThrottlerConfig and in UpdateThrottlerConf…
shlomi-noach Jun 20, 2023
d5b04e0
support new throttle-app* flags in vtctl and vtctldclient commands
shlomi-noach Jun 20, 2023
8290c6b
refactor ThrottleApp into throttler package
shlomi-noach Jun 20, 2023
f127649
fix excessive flag
shlomi-noach Jun 20, 2023
3684635
WaitForThrottledApp
shlomi-noach Jun 20, 2023
6010db7
give realistic throttler config
shlomi-noach Jun 20, 2023
dbed3bf
Merge remote-tracking branch 'upstream/main' into tablet-throttler-th…
shlomi-noach Jun 26, 2023
1a20570
Only WatchSrvKeyspace once the SrvKeyspace entry is confirmed to exist
shlomi-noach Jun 27, 2023
8a3256f
WaitForSrvKeyspace before updating the keyspace
shlomi-noach Jun 27, 2023
332569d
only WatchSrvKeyspace _once_
shlomi-noach Jun 27, 2023
18c16d6
adapt test, since we can't use topo-based throttling to throttle a si…
shlomi-noach Jun 27, 2023
a37ec02
fix test migration status expectation
shlomi-noach Jun 27, 2023
22be670
code comment
shlomi-noach Jun 27, 2023
de54ced
update flag docs
shlomi-noach Jun 27, 2023
903dfde
resolved conflict
shlomi-noach Jul 6, 2023
dfde7dd
import package name
shlomi-noach Jul 6, 2023
7df25f1
typo
shlomi-noach Jul 6, 2023
79dc0dd
noCustomQuery
shlomi-noach Jul 6, 2023
b0d1180
add 'vreplication' name to all vreplication-related throttled apps
shlomi-noach Jul 11, 2023
0df77d7
undo last commit
shlomi-noach Jul 11, 2023
b795070
vcopier identifies as 'vcopier:vreplication...' and vplayer identifie…
shlomi-noach Jul 11, 2023
d375287
resolved conflict
shlomi-noach Jul 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions go/cmd/vtctldclient/command/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,47 @@ limitations under the License.
package command

import (
"time"

"github.com/spf13/cobra"

"vitess.io/vitess/go/cmd/vtctldclient/cli"

"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/topodata"
mattlord marked this conversation as resolved.
Show resolved Hide resolved
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
)

var (
// UpdateThrottlerConfig makes a UpdateThrottlerConfig gRPC call to a vtctld.
UpdateThrottlerConfig = &cobra.Command{
Use: "UpdateThrottlerConfig [--enable|--disable] [--threshold=<float64>] [--custom-query=<query>] [--check-as-check-self|--check-as-check-shard] <keyspace>",
Use: "UpdateThrottlerConfig [--enable|--disable] [--threshold=<float64>] [--custom-query=<query>] [--check-as-check-self|--check-as-check-shard] [--throttle-app=<name>] [--throttle-app-ratio=<float, range [0..1]>] [--throttle-app-duration=<duration>] <keyspace>",
Short: "Update the tablet throttler configuration for all tablets in the given keyspace (across all cells)",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
RunE: commandUpdateThrottlerConfig,
}
)

var updateThrottlerConfigOptions vtctldatapb.UpdateThrottlerConfigRequest
var (
updateThrottlerConfigOptions vtctldatapb.UpdateThrottlerConfigRequest
throttledAppRule topodata.ThrottledAppRule
throttledAppDuration time.Duration
)

func commandUpdateThrottlerConfig(cmd *cobra.Command, args []string) error {
keyspace := cmd.Flags().Arg(0)
cli.FinishedParsing(cmd)

updateThrottlerConfigOptions.CustomQuerySet = cmd.Flags().Changed("custom-query")
updateThrottlerConfigOptions.Keyspace = keyspace

throttledAppRule.ExpiresAt = logutil.TimeToProto(time.Now().Add(throttledAppDuration))
if throttledAppRule.Name != "" {
updateThrottlerConfigOptions.ThrottledApp = &throttledAppRule
}

_, err := client.UpdateThrottlerConfig(commandCtx, &updateThrottlerConfigOptions)
if err != nil {
return err
Expand All @@ -57,5 +72,10 @@ func init() {
UpdateThrottlerConfig.Flags().StringVar(&updateThrottlerConfigOptions.CustomQuery, "custom-query", "", "custom throttler check query")
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.CheckAsCheckSelf, "check-as-check-self", false, "/throttler/check requests behave as is /throttler/check-self was called")
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.CheckAsCheckShard, "check-as-check-shard", false, "use standard behavior for /throttler/check requests")

UpdateThrottlerConfig.Flags().StringVar(&throttledAppRule.Name, "throttle-app", "", "an app name to throttle")
UpdateThrottlerConfig.Flags().Float64Var(&throttledAppRule.Ratio, "throttle-app-ratio", throttle.DefaultThrottleRatio, "ratio to throttle app (app specififed in --throttled-app)")
UpdateThrottlerConfig.Flags().DurationVar(&throttledAppDuration, "throttle-app-duration", throttle.DefaultAppThrottleDuration, "duration after which throttled app rule expires (app specififed in --throttled-app)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why this isn't also part of throttledAppRule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThrottledAppRule has an ExpiresAt, which is a declarative and absolute value to when the rule expires. However, for the user it's more convenient to speak about duration. If I'm having a problem right now in production and I want to throttle an app, I want to throttle it for, say, the next 4 hours. So I specify 4h. Vitess then computes and stores the absolute ExpiresAt.
Does that make sense?


Root.AddCommand(UpdateThrottlerConfig)
}
148 changes: 51 additions & 97 deletions go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package vrepl
import (
"flag"
"fmt"
"io"
"os"
"path"
"strings"
Expand Down Expand Up @@ -150,6 +149,12 @@ var (
`
)

const (
customThreshold = 5
throttlerEnabledTimeout = 60 * time.Second
useDefaultQuery = ""
mattlord marked this conversation as resolved.
Show resolved Hide resolved
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
Expand Down Expand Up @@ -192,7 +197,6 @@ func TestMain(m *testing.M) {
if err := clusterInstance.StartKeyspace(*keyspace, []string{"-80", "80-"}, 1, false); err != nil {
return 1, err
}

vtgateInstance := clusterInstance.NewVtgateInstance()
// Start vtgate
if err := vtgateInstance.Setup(); err != nil {
Expand All @@ -216,29 +220,6 @@ func TestMain(m *testing.M) {

}

// direct per-tablet throttler API instruction
func throttleResponse(tablet *cluster.Vttablet, path string) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/%s", tablet.VttabletProcess.TabletHostname, tablet.HTTPPort, path)
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
respBody = string(b)
return respBody, err
}

// direct per-tablet throttler API instruction
func throttleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp))
}

// direct per-tablet throttler API instruction
func unthrottleApp(tablet *cluster.Vttablet, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp))
}

func TestSchemaChange(t *testing.T) {
defer cluster.PanicHandler(t)

Expand All @@ -257,16 +238,34 @@ func TestSchemaChange(t *testing.T) {
err := clusterInstance.WaitForTabletsToHealthyInVtgate()
require.NoError(t, err)

_, err = throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, 0, "")
require.NoError(t, err)
t.Run("WaitForSrvKeyspace", func(t *testing.T) {
for _, ks := range clusterInstance.Keyspaces {
t.Run(ks.Name, func(t *testing.T) {
err := throttler.WaitForSrvKeyspace(clusterInstance, cell, ks.Name)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
})
}
})
t.Run("updating throttler config", func(t *testing.T) {
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, true, false, customThreshold, useDefaultQuery)
require.NoError(t, err)
})

for _, ks := range clusterInstance.Keyspaces {
for _, shard := range ks.Shards {
for _, tablet := range shard.Vttablets {
throttler.WaitForThrottlerStatusEnabled(t, tablet, true, nil, extendedMigrationWait)
}
t.Run("checking throttler config", func(t *testing.T) {
for _, ks := range clusterInstance.Keyspaces {
t.Run(ks.Name, func(t *testing.T) {
for _, shard := range ks.Shards {
t.Run(shard.Name, func(t *testing.T) {
for _, tablet := range shard.Vttablets {
t.Run(tablet.Alias, func(t *testing.T) {
throttler.WaitForThrottlerStatusEnabled(t, tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: customThreshold}, throttlerEnabledTimeout)
})
}
})
}
})
}
}
})

testWithInitialSchema(t)
t.Run("alter non_online", func(t *testing.T) {
Expand Down Expand Up @@ -412,18 +411,9 @@ func TestSchemaChange(t *testing.T) {
var uuid string

func() {
for _, shard := range shards {
// technically we only need to throttle on a REPLICA, because that's the
// vstreamer source; but it's OK to be on the safe side and throttle on all tablets. Doesn't
// change the essence of this test.
for _, tablet := range shard.Vttablets {
body, err := throttleApp(tablet, throttlerapp.VStreamerName)
defer unthrottleApp(tablet, throttlerapp.VStreamerName)

assert.NoError(t, err)
assert.Contains(t, body, throttlerapp.VStreamerName)
}
}
_, err := throttler.ThrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.VStreamerName)
mattlord marked this conversation as resolved.
Show resolved Hide resolved
defer throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.VStreamerName)
require.NoError(t, err)

uuid = testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
Expand Down Expand Up @@ -520,24 +510,10 @@ func TestSchemaChange(t *testing.T) {
t.Run(fmt.Sprintf("PlannedReparentShard via throttling %d/2", (currentPrimaryTabletIndex+1)), func(t *testing.T) {

insertRows(t, 2)
for i := range shards {
var body string
var err error
switch i {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = throttleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
defer unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
body, err = throttleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
defer unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
}
assert.NoError(t, err)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
_, err = throttler.ThrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.OnlineDDLName)
assert.NoError(t, err)
defer throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.OnlineDDLName)

uuid := testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)

t.Run("wait for migration to run", func(t *testing.T) {
Expand Down Expand Up @@ -585,22 +561,8 @@ func TestSchemaChange(t *testing.T) {
onlineddl.PrintQueryResult(os.Stdout, rs)
})
t.Run("unthrottle", func(t *testing.T) {
for i := range shards {
var body string
var err error
switch i {
case 0:
// this is the shard where we run PRS
// Use per-tablet throttling API
body, err = unthrottleApp(currentPrimaryTablet, throttlerapp.OnlineDDLName)
case 1:
// no PRS on this shard
// Use per-tablet throttling API
body, err = unthrottleApp(shards[i].Vttablets[0], throttlerapp.OnlineDDLName)
}
assert.NoError(t, err)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
_, err = throttler.UnthrottleAppAndWaitUntilTabletsConfirm(t, clusterInstance, throttlerapp.OnlineDDLName)
assert.NoError(t, err)
})
t.Run("expect completion", func(t *testing.T) {
_ = onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, extendedMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
Expand Down Expand Up @@ -818,36 +780,28 @@ func TestSchemaChange(t *testing.T) {
// - two shards as opposed to one
// - tablet throttling
t.Run("Revert a migration completed on one shard and cancelled on another", func(t *testing.T) {
// shard 0 will run normally, shard 1 will be throttled
defer unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
t.Run("throttle shard 1", func(t *testing.T) {
body, err := throttleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
})
// shard 0 will run normally, shard 1 will be postponed

var uuid string
t.Run("run migrations, expect 1st to complete, 2nd to be running", func(t *testing.T) {
uuid = testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)
t.Run("run migrations, expect running on both shards", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that most of the changes in this file helped to deflake the tests? Maybe it's another cherry-pick?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new design, it's impossible to tell vitess "throttle one shard but not this other shard". Previously, it was possible since we hit a shard's PRIMARY tablet's HTTP API directly.

So the new design means we need to change the tests. And we don't need to test something that we don't support...

uuid = testOnlineDDLStatement(t, alterTableTrivialStatement, "vitess --postpone-launch", providedUUID, providedMigrationContext, "vtgate", "test_val", "", true)
onlineddl.CheckLaunchMigration(t, &vtParams, shards[0:1], uuid, "-80", true)
{
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards[:1], uuid, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards[:1], uuid, schema.OnlineDDLStatusComplete)
}
{
// shard 1 is throttled
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards[1:], uuid, normalMigrationWait, schema.OnlineDDLStatusRunning)
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards[1:], uuid, normalMigrationWait, schema.OnlineDDLStatusQueued)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards[1:], uuid, schema.OnlineDDLStatusRunning)
onlineddl.CheckMigrationStatus(t, &vtParams, shards[1:], uuid, schema.OnlineDDLStatusQueued)
}
})
t.Run("check cancel migration", func(t *testing.T) {
onlineddl.CheckCancelAllMigrations(t, &vtParams, 1)
})
t.Run("unthrottle shard 1", func(t *testing.T) {
body, err := unthrottleApp(shards[1].Vttablets[0], throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
t.Run("launch-all", func(t *testing.T) {
onlineddl.CheckLaunchAllMigrations(t, &vtParams, 0)
})
var revertUUID string
t.Run("issue revert migration", func(t *testing.T) {
Expand All @@ -859,12 +813,12 @@ func TestSchemaChange(t *testing.T) {
revertUUID = row.AsString("uuid", "")
assert.NotEmpty(t, revertUUID)
})
t.Run("expect one revert successful, another failed", func(t *testing.T) {
t.Run("migrations were cancelled, revert should impossible", func(t *testing.T) {
{
// shard 0 migration was complete. Revert should be successful
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards[:1], revertUUID, normalMigrationWait, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards[:1], revertUUID, schema.OnlineDDLStatusComplete)
onlineddl.CheckMigrationStatus(t, &vtParams, shards[:1], revertUUID, schema.OnlineDDLStatusFailed)
}
{
// shard 0 migration was cancelled. Revert should not be possible
Expand Down
Loading