Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tablet throttler: read and use MySQL host metrics #16904

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
71d98fe
mysqlctl proto: adding ThrottlerMetrics rpc
shlomi-noach Sep 23, 2024
9922731
naming: SystemMetrics instead of ThrottlerMetrics
shlomi-noach Sep 25, 2024
214db0d
implement SystemMetrics
shlomi-noach Sep 25, 2024
5660b77
tabletmanager: MysqlSystemMetrics rpc
shlomi-noach Sep 25, 2024
2ec9f75
implement SystemMetrics
shlomi-noach Sep 25, 2024
0741a2b
pass TabletAlias to throttler
shlomi-noach Sep 26, 2024
0b3666b
implement MysqlSystemMetrics
shlomi-noach Sep 26, 2024
20e0577
force check result
shlomi-noach Sep 26, 2024
df99856
unit test
shlomi-noach Sep 26, 2024
a5d76d3
resolved conflict
shlomi-noach Sep 26, 2024
098e6a4
remove bogus files
shlomi-noach Sep 26, 2024
338c52c
Merge branch 'main' into throttler-mysql-metrics
shlomi-noach Sep 26, 2024
4250909
resolved conflict
shlomi-noach Sep 29, 2024
07a3461
remove excessive file, caused by merge conflict
shlomi-noach Sep 29, 2024
2c48d36
Rename 'SystemMetrics'->'HostMetrics'
shlomi-noach Sep 29, 2024
0d73d30
better doclet
shlomi-noach Sep 29, 2024
e3fccae
use vtrpcpb.RPCError
shlomi-noach Sep 29, 2024
757cada
"datadir-used-ratio"
shlomi-noach Sep 29, 2024
7dd4b8b
internal refactor: SelfMetricReadParams
shlomi-noach Sep 30, 2024
73fd43f
Merge branch 'main' into throttler-use-mysql-metrics
shlomi-noach Oct 1, 2024
9166093
using threads_running as loadavg does not produce meaningful info on …
shlomi-noach Oct 7, 2024
545112f
adding MysqldLoadAvgMetricName and MysqldDatadirUsedRatioMetricName
shlomi-noach Oct 7, 2024
72b09b9
moredebug info in case of error
shlomi-noach Oct 7, 2024
91d2e42
return empty (zero) result for loadavg when not on linux
shlomi-noach Oct 7, 2024
38b9d73
return empty (zero) result for loadavg when not on linux
shlomi-noach Oct 7, 2024
adfcf6d
SelfMetricReadParams includes TabletManagerClient and TabletInfo
shlomi-noach Oct 7, 2024
d8b6ec2
pass tmClient and tabletInfo to self metrics
shlomi-noach Oct 7, 2024
6726285
Merge branch 'main' into throttler-use-mysql-metrics
shlomi-noach Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -779,16 +779,16 @@ func TestUpdateAppCheckedMetrics(t *testing.T) {
}
waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED)
})
t.Run("assigning 'loadavg' metrics to 'test' app", func(t *testing.T) {
t.Run("assigning 'threads_running' metrics to 'test' app", func(t *testing.T) {
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: "loadavg", Threshold: 7777}
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.ThreadsRunningMetricName.String(), Threshold: 7777}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil)
assert.NoError(t, err)
}
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{}
appCheckedMetrics := map[string]*topodatapb.ThrottlerConfig_MetricNames{
testAppName.String(): {Names: []string{"loadavg"}},
testAppName.String(): {Names: []string{base.ThreadsRunningMetricName.String()}},
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, appCheckedMetrics)
assert.NoError(t, err)
Expand All @@ -802,18 +802,18 @@ func TestUpdateAppCheckedMetrics(t *testing.T) {
for _, tablet := range []cluster.Vttablet{*primaryTablet, *replicaTablet} {
throttler.WaitForThrottlerStatusEnabled(t, &clusterInstance.VtctldClientProcess, &tablet, true, &throttler.Config{Query: throttler.DefaultQuery, Threshold: unreasonablyLowThreshold.Seconds()}, throttlerEnabledTimeout)
}
t.Run("validating OK response from throttler since it's checking loadavg", func(t *testing.T) {
t.Run("validating OK response from throttler since it's checking threads_running", func(t *testing.T) {
if !waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
})
})
t.Run("assigning 'loadavg,lag' metrics to 'test' app", func(t *testing.T) {
t.Run("assigning 'threads_running,lag' metrics to 'test' app", func(t *testing.T) {
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{}
appCheckedMetrics := map[string]*topodatapb.ThrottlerConfig_MetricNames{
testAppName.String(): {Names: []string{"loadavg,lag"}},
testAppName.String(): {Names: []string{base.ThreadsRunningMetricName.String(), base.LagMetricName.String()}},
}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, appCheckedMetrics)
assert.NoError(t, err)
Expand All @@ -833,7 +833,7 @@ func TestUpdateAppCheckedMetrics(t *testing.T) {
})
t.Run("removing assignment from 'test' app and restoring defaults", func(t *testing.T) {
{
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: "loadavg", Threshold: 0}
req := &vtctldatapb.UpdateThrottlerConfigRequest{MetricName: base.ThreadsRunningMetricName.String(), Threshold: 0}
_, err := throttler.UpdateThrottlerTopoConfig(clusterInstance, req, nil, nil)
assert.NoError(t, err)
}
Expand Down
8 changes: 5 additions & 3 deletions go/test/endtoend/throttler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat
selfCheckURL := fmt.Sprintf("http://localhost:%d/throttler/check-self", tablet.HTTPPort)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(500 * time.Millisecond)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
Expand All @@ -548,8 +548,10 @@ func WaitForValidData(t *testing.T, tablet *cluster.Vttablet, timeout time.Durat
}
select {
case <-ctx.Done():
t.Errorf("timed out waiting for %s tablet's throttler to return a valid result after %v; last seen value: %+v",
tablet.Alias, timeout, checkResp)
respByte, _ := io.ReadAll(checkResp.Body)
body := string(respByte)
t.Errorf("timed out waiting for %s tablet's throttler to return a valid result after %v; last seen result: %+v",
tablet.Alias, timeout, body)
return
case <-ticker.C:
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/mysqld.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,7 +1333,7 @@ func hostMetrics(ctx context.Context, cnf *Mycnf) (*mysqlctlpb.HostMetricsRespon
_ = func() error {
metric := newMetric("loadavg")
if runtime.GOOS != "linux" {
return withError(metric, fmt.Errorf("loadavg metric is only available on Linux"))
return nil
}
content, err := os.ReadFile("/proc/loadavg")
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/base/metric_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/patrickmn/go-cache"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

// MetricsQueryType indicates the type of metrics query on MySQL backend. See following.
Expand Down Expand Up @@ -142,13 +143,13 @@ func (metric *ThrottleMetric) WithError(err error) *ThrottleMetric {

// ReadThrottleMetrics returns a metric for the given probe. Either by explicit query
// or via SHOW REPLICA STATUS
func ReadThrottleMetrics(ctx context.Context, probe *Probe, metricsFunc func(context.Context) ThrottleMetrics) ThrottleMetrics {
func ReadThrottleMetrics(ctx context.Context, probe *Probe, tmClient tmclient.TabletManagerClient, metricsFunc func(context.Context, tmclient.TabletManagerClient) ThrottleMetrics) ThrottleMetrics {
if metrics := getCachedThrottleMetrics(probe); metrics != nil {
return metrics
}

started := time.Now()
throttleMetrics := metricsFunc(ctx)
throttleMetrics := metricsFunc(ctx, tmClient)

go func(metrics ThrottleMetrics, started time.Time) {
stats.GetOrNewGauge("ThrottlerProbesLatency", "probes latency").Set(time.Since(started).Nanoseconds())
Expand Down
12 changes: 7 additions & 5 deletions go/vt/vttablet/tabletserver/throttle/base/metric_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ func (names MetricNames) Unique() MetricNames {
}

const (
DefaultMetricName MetricName = "default"
LagMetricName MetricName = "lag"
ThreadsRunningMetricName MetricName = "threads_running"
CustomMetricName MetricName = "custom"
LoadAvgMetricName MetricName = "loadavg"
DefaultMetricName MetricName = "default"
LagMetricName MetricName = "lag"
ThreadsRunningMetricName MetricName = "threads_running"
CustomMetricName MetricName = "custom"
LoadAvgMetricName MetricName = "loadavg"
MysqldLoadAvgMetricName MetricName = "mysqld-loadavg"
MysqldDatadirUsedRatioMetricName MetricName = "mysqld-datadir-used-ratio"
)

func (metric MetricName) DefaultScope() Scope {
Expand Down
11 changes: 10 additions & 1 deletion go/vt/vttablet/tabletserver/throttle/base/self_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,24 @@ import (
"fmt"
"strconv"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

type SelfMetricReadParams struct {
Throttler ThrottlerMetricsPublisher
Conn *connpool.Conn
TmClient tmclient.TabletManagerClient
TabletInfo *topo.TabletInfo
}

type SelfMetric interface {
Name() MetricName
DefaultScope() Scope
DefaultThreshold() float64
RequiresConn() bool
Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric
Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric
}

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ package base

import (
"context"

"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

var _ SelfMetric = registerSelfMetric(&CustomQuerySelfMetric{})
Expand All @@ -43,6 +41,6 @@ func (m *CustomQuerySelfMetric) RequiresConn() bool {
return true
}

func (m *CustomQuerySelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, conn, throttler.GetCustomMetricsQuery())
func (m *CustomQuerySelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, params.Conn, params.Throttler.GetCustomMetricsQuery())
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package base
import (
"context"
"fmt"

"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

var _ SelfMetric = registerSelfMetric(&DefaultSelfMetric{})
Expand All @@ -44,7 +42,7 @@ func (m *DefaultSelfMetric) RequiresConn() bool {
return false
}

func (m *DefaultSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
func (m *DefaultSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric {
return &ThrottleMetric{
Err: fmt.Errorf("unexpected direct call to DefaultSelfMetric.Read"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

var (
Expand Down Expand Up @@ -65,6 +64,6 @@ func (m *LagSelfMetric) RequiresConn() bool {
return true
}

func (m *LagSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, conn, m.GetQuery())
func (m *LagSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric {
return ReadSelfMySQLThrottleMetric(ctx, params.Conn, m.GetQuery())
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,6 @@ import (
"runtime"
"strconv"
"strings"

"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
)

var (
loadavgOnlyAvailableOnLinuxMetric = &ThrottleMetric{
Scope: SelfScope,
Err: fmt.Errorf("loadavg metric is only available on Linux"),
}
)

var _ SelfMetric = registerSelfMetric(&LoadAvgSelfMetric{})
Expand All @@ -55,13 +46,13 @@ func (m *LoadAvgSelfMetric) RequiresConn() bool {
return false
}

func (m *LoadAvgSelfMetric) Read(ctx context.Context, throttler ThrottlerMetricsPublisher, conn *connpool.Conn) *ThrottleMetric {
if runtime.GOOS != "linux" {
return loadavgOnlyAvailableOnLinuxMetric
}
func (m *LoadAvgSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric {
metric := &ThrottleMetric{
Scope: SelfScope,
}
if runtime.GOOS != "linux" {
return metric
}
{
content, err := os.ReadFile("/proc/loadavg")
if err != nil {
Expand Down
156 changes: 156 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/base/self_metric_mysqld.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package base

import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

"vitess.io/vitess/go/timer"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

var (
mysqlHostMetricsRpcTimeout = 5 * time.Second
mysqlHostMetricsRateLimit = 10 * time.Second
mysqlHostMetricsRateLimiter atomic.Pointer[timer.RateLimiter]
lastMySQLHostMetricsResponse atomic.Pointer[tabletmanagerdatapb.MysqlHostMetricsResponse]
)

// getMysqlMetricsRateLimiter returns a rate limiter that is active until the given context is cancelled.
// This function will be called sequentially, but nonetheless it offers _some_ concurrent safety. Namely,
// that a created rate limiter is guaranteed to be cleaned up
func getMysqlMetricsRateLimiter(ctx context.Context, rateLimit time.Duration) *timer.RateLimiter {
rateLimiter := mysqlHostMetricsRateLimiter.Load()
if rateLimiter == nil {
rateLimiter = timer.NewRateLimiter(rateLimit)
go func() {
defer mysqlHostMetricsRateLimiter.Store(nil)
defer rateLimiter.Stop()
<-ctx.Done()
}()
mysqlHostMetricsRateLimiter.Store(rateLimiter)
}
return rateLimiter
}

// readMysqlHostMetrics reads MySQL host metrics sporadically from the tablet manager (which in turn reads
// them from mysql deamon). The metrics are then cached, whether successful or not.
// This idea is that is is very wasteful to read these metrics for every single query. E.g. right now the throttler
// can issue 4 reads per second, which is wasteful to go through two RPCs to get the disk space usage for example. Even the load
// average on the MySQL server is not that susceptible to change.
func readMysqlHostMetrics(ctx context.Context, params *SelfMetricReadParams) error {
if params.TmClient == nil {
return fmt.Errorf("tmClient is nil")
}
if params.TabletInfo == nil {
return fmt.Errorf("tabletInfo is nil")
}
rateLimiter := getMysqlMetricsRateLimiter(ctx, mysqlHostMetricsRateLimit)
err := rateLimiter.Do(func() error {
ctx, cancel := context.WithTimeout(ctx, mysqlHostMetricsRpcTimeout)
defer cancel()

resp, err := params.TmClient.MysqlHostMetrics(ctx, params.TabletInfo.Tablet, &tabletmanagerdatapb.MysqlHostMetricsRequest{})
if err != nil {
return err
}
lastMySQLHostMetricsResponse.Store(resp)
return nil
})
return err
}

// getMysqlHostMetric gets a metric from the last read MySQL host metrics. The metric will either be directly read from
// tablet manager (which then reads it from the mysql deamon), or from the cache.
func getMysqlHostMetric(ctx context.Context, params *SelfMetricReadParams, mysqlHostMetricName string) *ThrottleMetric {
metric := &ThrottleMetric{
Scope: SelfScope,
}
if err := readMysqlHostMetrics(ctx, params); err != nil {
return metric.WithError(err)
}
resp := lastMySQLHostMetricsResponse.Load()
if resp == nil {
return metric.WithError(ErrNoResultYet)
}
mysqlMetric := resp.HostMetrics.Metrics[mysqlHostMetricName]
if mysqlMetric == nil {
return metric.WithError(ErrNoSuchMetric)
}
metric.Value = mysqlMetric.Value
if mysqlMetric.Error != nil {
metric.Err = errors.New(mysqlMetric.Error.Message)
}
return metric
}

var _ SelfMetric = registerSelfMetric(&MysqldLoadAvgSelfMetric{})
var _ SelfMetric = registerSelfMetric(&MysqldDatadirUsedRatioSelfMetric{})

// MysqldLoadAvgSelfMetric stands for the load average per cpu, on the MySQL host.
type MysqldLoadAvgSelfMetric struct {
}

func (m *MysqldLoadAvgSelfMetric) Name() MetricName {
return MysqldLoadAvgMetricName
}

func (m *MysqldLoadAvgSelfMetric) DefaultScope() Scope {
return SelfScope
}

func (m *MysqldLoadAvgSelfMetric) DefaultThreshold() float64 {
return 1.0
}

func (m *MysqldLoadAvgSelfMetric) RequiresConn() bool {
return false
}

func (m *MysqldLoadAvgSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric {
return getMysqlHostMetric(ctx, params, "loadavg")
}

// MysqldDatadirUsedRatioSelfMetric stands for the disk space usage of the mount where MySQL's datadir is located.
// Range: 0.0 (empty) - 1.0 (full)
type MysqldDatadirUsedRatioSelfMetric struct {
}

func (m *MysqldDatadirUsedRatioSelfMetric) Name() MetricName {
return MysqldDatadirUsedRatioMetricName
}

func (m *MysqldDatadirUsedRatioSelfMetric) DefaultScope() Scope {
return SelfScope
}

func (m *MysqldDatadirUsedRatioSelfMetric) DefaultThreshold() float64 {
return 0.98
}

func (m *MysqldDatadirUsedRatioSelfMetric) RequiresConn() bool {
return false
}

func (m *MysqldDatadirUsedRatioSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric {
return getMysqlHostMetric(ctx, params, "datadir-used-ratio")
}
Loading
Loading