Skip to content

Commit

Permalink
resource_control: fetch cpu quota metrics from store instead of prome…
Browse files Browse the repository at this point in the history
…theus (#49176) (#49255)

close #49174
  • Loading branch information
ti-chi-bot committed Dec 8, 2023
1 parent a53a963 commit 6c98485
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 44 deletions.
169 changes: 148 additions & 21 deletions executor/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,30 @@
package executor

import (
"bufio"
"context"
"encoding/base64"
"fmt"
"io"
"math"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
"time"

"github.com/docker/go-units"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/duration"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -121,6 +130,13 @@ const (
// duration Indicates the supported calibration duration
maxDuration = time.Hour * 24
minDuration = time.Minute

// serverTypeTiDB is tidb's instance type name
serverTypeTiDB = "tidb"
// serverTypeTiKV is tikv's instance type name
serverTypeTiKV = "tikv"
// serverTypeTiFlash is tiflash's instance type name
serverTypeTiFlash = "tiflash"
)

type calibrateResourceExec struct {
Expand Down Expand Up @@ -187,33 +203,39 @@ func (e *calibrateResourceExec) Next(ctx context.Context, req *chunk.Chunk) erro
return nil
}
e.done = true

exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
if len(e.optionList) > 0 {
return e.dynamicCalibrate(ctx, req, exec)
return e.dynamicCalibrate(ctx, req)
}
return e.staticCalibrate(ctx, req, exec)
return e.staticCalibrate(req)
}

var (
errLowUsage = errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead")
errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v")
)

func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk.Chunk) error {
exec := e.ctx.(sqlexec.RestrictedSQLExecutor)
startTs, endTs, err := e.parseCalibrateDuration(ctx)
if err != nil {
return err
}
serverInfos, err := infoschema.GetClusterServerInfo(e.ctx)
if err != nil {
return err
}
startTime := startTs.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05")
endTime := endTs.In(e.ctx.GetSessionVars().Location()).Format("2006-01-02 15:04:05")

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(serverInfos)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
totalTiDBCPU, err := getTiDBTotalCPUQuota(serverInfos)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand Down Expand Up @@ -277,20 +299,21 @@ func (e *calibrateResourceExec) dynamicCalibrate(ctx context.Context, req *chunk
return nil
}

func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
func (e *calibrateResourceExec) staticCalibrate(req *chunk.Chunk) error {
// first fetch the ru settings config.
if resourceGroupCtl == nil {
return errors.New("resource group controller is not initialized")
}
clusterInfo, err := infoschema.GetClusterServerInfo(e.ctx)
if err != nil {
return err
}

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(clusterInfo)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(clusterInfo)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand All @@ -304,8 +327,8 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.
return errors.Errorf("unknown workload '%T'", e.workloadType)
}

if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio
if totalTiDBCPUQuota/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPUQuota / baseCost.tidbToKVCPURatio
}
ruCfg := resourceGroupCtl.GetConfig()
ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) +
Expand All @@ -318,14 +341,27 @@ func (e *calibrateResourceExec) staticCalibrate(ctx context.Context, req *chunk.
return nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
func getTiDBTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
cpuQuota := float64(runtime.GOMAXPROCS(0))
failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) {
if val != nil {
cpuQuota = float64(val.(int))
}
})
instanceNum := count(clusterInfo, serverTypeTiDB)
return cpuQuota * float64(instanceNum), nil
}

func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
func getTiKVTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
instanceNum := count(clusterInfo, serverTypeTiKV)
if instanceNum == 0 {
return 0.0, errors.New("no server with type 'tikv' is found")
}
cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiKV, "tikv_server_cpu_cores_quota")
if err != nil {
return 0.0, err
}
return cpuQuota * float64(instanceNum), nil
}

type timePointValue struct {
Expand Down Expand Up @@ -403,3 +439,94 @@ func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sql
}
return &timeSeriesValues{idx: 0, vals: ret}, nil
}

func count(clusterInfo []infoschema.ServerInfo, ty string) int {
num := 0
for _, e := range clusterInfo {
if e.ServerType == ty {
num++
}
}
return num
}

func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) {
var cpuQuota float64
err := fetchStoreMetrics(serverInfos, serverType, func(addr string, resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
return errors.Errorf("request %s failed: %s", addr, resp.Status)
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, metricName) {
continue
}
// the metrics format is like following:
// tikv_server_cpu_cores_quota 8
quota, err := strconv.ParseFloat(line[len(metricName)+1:], 64)
if err == nil {
cpuQuota = quota
}
return errors.Trace(err)
}
return errors.Errorf("metrics '%s' not found from server '%s'", metricName, addr)
})
return cpuQuota, err
}

func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, *http.Response) error) error {
var firstErr error
for _, srv := range serversInfo {
if srv.ServerType != serverType {
continue
}
if len(srv.StatusAddr) == 0 {
continue
}
url := fmt.Sprintf("%s://%s/metrics", util.InternalHTTPSchema(), srv.StatusAddr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
var resp *http.Response
failpoint.Inject("mockMetricsResponse", func(val failpoint.Value) {
if val != nil {
data, _ := base64.StdEncoding.DecodeString(val.(string))
resp = &http.Response{
StatusCode: http.StatusOK,
Body: noopCloserWrapper{
Reader: strings.NewReader(string(data)),
},
}
}
})
if resp == nil {
var err1 error
// ignore false positive go line, can't use defer here because it's in a loop.
//nolint:bodyclose
resp, err1 = util.InternalHTTPClient().Do(req)
if err1 != nil {
if firstErr == nil {
firstErr = err1
}
continue
}
}
err = onResp(srv.Address, resp)
resp.Body.Close()
return err
}
if firstErr == nil {
firstErr = errors.Errorf("no server with type '%s' is found", serverType)
}
return firstErr
}

type noopCloserWrapper struct {
io.Reader
}

func (noopCloserWrapper) Close() error {
return nil
}
59 changes: 36 additions & 23 deletions executor/calibrate_resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package executor_test

import (
"context"
"encoding/base64"
"encoding/json"
"strings"
"testing"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -76,12 +78,27 @@ func TestCalibrateResource(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(context.Background(), rs.NewChunk(nil))
require.ErrorContains(t, err, "query metric error: pd unavailable")
require.ErrorContains(t, err, "no server with type 'tikv' is found")

// error sql
_, err = tk.Exec("CALIBRATE RESOURCE WORKLOAD tpcc START_TIME '2020-02-12 10:35:00'")
require.Error(t, err)

// Mock for cluster info
// information_schema.cluster_config
instances := []string{
"pd,127.0.0.1:32379,127.0.0.1:32380,mock-version,mock-githash,0",
"tidb,127.0.0.1:34000,30080,mock-version,mock-githash,1001",
"tikv,127.0.0.1:30160,30180,mock-version,mock-githash,0",
"tikv,127.0.0.1:30161,30181,mock-version,mock-githash,0",
"tikv,127.0.0.1:30162,30182,mock-version,mock-githash,0",
}
fpExpr := `return("` + strings.Join(instances, ";") + `")`
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockClusterInfo", fpExpr))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/infoschema/mockClusterInfo"))
}()

// Mock for metric table data.
fpName := "github.com/pingcap/tidb/executor/mockMetricsTableData"
require.NoError(t, failpoint.Enable(fpName, "return"))
Expand All @@ -95,40 +112,36 @@ func TestCalibrateResource(t *testing.T) {
return time
}

metricsData := `# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 49943
# HELP tikv_server_cpu_cores_quota Total CPU cores quota for TiKV server
# TYPE tikv_server_cpu_cores_quota gauge
tikv_server_cpu_cores_quota 8
`
// failpoint doesn't support string contains whitespaces and newline
encodedData := base64.StdEncoding.EncodeToString([]byte(metricsData))
fpExpr = `return("` + encodedData + `")`
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockMetricsResponse", fpExpr))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockGOMAXPROCS", "return(40)"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockGOMAXPROCS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockMetricsResponse"))
}()
mockData := make(map[string][][]types.Datum)
ctx := context.WithValue(context.Background(), "__mockMetricsTableData", mockData)
ctx = failpoint.WithHook(ctx, func(_ context.Context, fpname string) bool {
return fpName == fpname
})
rs, err = tk.Exec("CALIBRATE RESOURCE")
require.NoError(t, err)
require.NotNil(t, rs)
err = rs.Next(ctx, rs.NewChunk(nil))
// because when mock metrics is empty, error is always `pd unavailable`, don't check detail.
require.ErrorContains(t, err, "There is no CPU quota metrics, query metric error: pd unavailable")

mockData["tikv_cpu_quota"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-2", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-0", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-1", 8.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-2", 8.0),
}
mockData["tidb_server_maxprocs"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 40.0),
types.MakeDatums(datetime("2020-02-12 10:36:00"), "tidb-0", 40.0),
}

tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("69768"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD TPCC").Check(testkit.Rows("69768"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_READ_WRITE").Check(testkit.Rows("55823"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_READ_ONLY").Check(testkit.Rows("34926"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_WRITE_ONLY").Check(testkit.Rows("109776"))

// change total tidb cpu to less than tikv_cpu_quota
mockData["tidb_server_maxprocs"] = [][]types.Datum{
types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 8.0),
}
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockGOMAXPROCS", "return(8)"))
tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38760"))

// construct data for dynamic calibrate
Expand Down

0 comments on commit 6c98485

Please sign in to comment.