Skip to content

Commit

Permalink
resource_control: fetch cpu quota metrics from store instead of prome…
Browse files Browse the repository at this point in the history
…theus (#49176)

close #49174
  • Loading branch information
glorv authored Dec 7, 2023
1 parent 6f72f88 commit 5db7c6a
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 81 deletions.
1 change: 1 addition & 0 deletions pkg/executor/internal/calibrateresource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn/staleread",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/sqlexec",
"@com_github_docker_go_units//:go-units",
Expand Down
210 changes: 178 additions & 32 deletions pkg/executor/internal/calibrateresource/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
package calibrateresource

import (
"bufio"
"context"
"encoding/base64"
"fmt"
"io"
"math"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
"time"

"github.com/docker/go-units"
Expand All @@ -34,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -81,6 +89,15 @@ var (
}
)

const (
// serverTypeTiDB is tidb's instance type name
serverTypeTiDB = "tidb"
// serverTypeTiKV is tikv's instance type name
serverTypeTiKV = "tikv"
// serverTypeTiFlash is tiflash's instance type name
serverTypeTiFlash = "tiflash"
)

// the resource cost rate of a specified workload per 1 tikv cpu.
type baseResourceCost struct {
// represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu
Expand Down Expand Up @@ -235,43 +252,55 @@ func (e *Executor) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}
e.done = true

exec := e.Ctx().(sqlexec.RestrictedSQLExecutor)
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
if len(e.OptionList) > 0 {
return e.dynamicCalibrate(ctx, req, exec)
return e.dynamicCalibrate(ctx, req)
}
return e.staticCalibrate(ctx, req, exec)
return e.staticCalibrate(req)
}

var (
errLowUsage = errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead")
errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v")
)

func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk) error {
exec := e.Ctx().(sqlexec.RestrictedSQLExecutor)
startTs, endTs, err := e.parseCalibrateDuration(ctx)
if err != nil {
return err
}
tidbQuota, err1 := e.getTiDBQuota(ctx, exec, startTs, endTs)
tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, startTs, endTs)
clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx())
if err != nil {
return err
}
tidbQuota, err1 := e.getTiDBQuota(ctx, exec, clusterInfo, startTs, endTs)
tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, clusterInfo, startTs, endTs)
if err1 != nil && err2 != nil {
return err1
}

req.AppendUint64(0, uint64(tidbQuota+tiflashQuota))
return nil
}

func (e *Executor) getTiDBQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) {
func (e *Executor) getTiDBQuota(
ctx context.Context,
exec sqlexec.RestrictedSQLExecutor,
serverInfos []infoschema.ServerInfo,
startTs, endTs time.Time,
) (float64, error) {
startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)
endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(serverInfos)
if err != nil {
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
totalTiDBCPU, err := getTiDBTotalCPUQuota(serverInfos)
if err != nil {
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand Down Expand Up @@ -368,12 +397,17 @@ func setupQuotas(quotas []float64) (float64, error) {
return sum / float64(upperBound-lowerBound), nil
}

func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) {
func (e *Executor) getTiFlashQuota(
ctx context.Context,
exec sqlexec.RestrictedSQLExecutor,
serverInfos []infoschema.ServerInfo,
startTs, endTs time.Time,
) (float64, error) {
startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)
endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)

quotas := make([]float64, 0)
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec)
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(serverInfos)
if err != nil {
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand Down Expand Up @@ -407,25 +441,26 @@ func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedS
return setupQuotas(quotas)
}

func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
func (e *Executor) staticCalibrate(req *chunk.Chunk) error {
resourceGroupCtl := domain.GetDomain(e.Ctx()).ResourceGroupsController()
// first fetch the ru settings config.
if resourceGroupCtl == nil {
return errors.New("resource group controller is not initialized")
}
clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx())
if err != nil {
return err
}
ruCfg := resourceGroupCtl.GetConfig()
if e.WorkloadType == ast.TPCH10 {
return staticCalibrateTpch10(ctx, req, exec, ruCfg)
return staticCalibrateTpch10(req, clusterInfo, ruCfg)
}

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(clusterInfo)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(clusterInfo)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand All @@ -439,8 +474,8 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s
return errors.Errorf("unknown workload '%T'", e.WorkloadType)
}

if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio
if totalTiDBCPUQuota/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPUQuota / baseCost.tidbToKVCPURatio
}
ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) +
float64(ruCfg.CPUMsCost)*baseCost.kvCPU*1000 + // convert to ms
Expand All @@ -452,14 +487,14 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s
return nil
}

func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor, ruCfg *resourceControlClient.RUConfig) error {
func staticCalibrateTpch10(req *chunk.Chunk, clusterInfo []infoschema.ServerInfo, ruCfg *resourceControlClient.RUConfig) error {
// TPCH10 only considers the resource usage of the TiFlash including cpu and read bytes. Others are ignored.
// cpu usage: 105494.666484 / 20 / 20 = 263.74
// read bytes: 401799161689.0 / 20 / 20 = 1004497904.22
const cpuTimePerCPUPerSec float64 = 263.74
const readBytesPerCPUPerSec float64 = 1004497904.22
ruPerCPU := float64(ruCfg.CPUMsCost)*cpuTimePerCPUPerSec + float64(ruCfg.ReadBytesCost)*readBytesPerCPUPerSec
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec)
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(clusterInfo)
if err != nil {
return err
}
Expand All @@ -468,19 +503,39 @@ func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.R
return nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
func getTiDBTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
cpuQuota := float64(runtime.GOMAXPROCS(0))
failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) {
if val != nil {
cpuQuota = float64(val.(int))
}
})
instanceNum := count(clusterInfo, serverTypeTiDB)
return cpuQuota * float64(instanceNum), nil
}

func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
func getTiKVTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
instanceNum := count(clusterInfo, serverTypeTiKV)
if instanceNum == 0 {
return 0.0, errors.New("no server with type 'tikv' is found")
}
cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiKV, "tikv_server_cpu_cores_quota")
if err != nil {
return 0.0, err
}
return cpuQuota * float64(instanceNum), nil
}

func getTiFlashLogicalCores(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tiflash_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tiflash_cpu_quota")
func getTiFlashLogicalCores(clusterInfo []infoschema.ServerInfo) (float64, error) {
instanceNum := count(clusterInfo, serverTypeTiFlash)
if instanceNum == 0 {
return 0.0, nil
}
cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiFlash, "tiflash_proxy_tikv_server_cpu_cores_quota")
if err != nil {
return 0.0, err
}
return cpuQuota * float64(instanceNum), nil
}

func getTiFlashRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) {
Expand Down Expand Up @@ -568,3 +623,94 @@ func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sql
}
return &timeSeriesValues{idx: 0, vals: ret}, nil
}

func count(clusterInfo []infoschema.ServerInfo, ty string) int {
num := 0
for _, e := range clusterInfo {
if e.ServerType == ty {
num++
}
}
return num
}

func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) {
var cpuQuota float64
err := fetchStoreMetrics(serverInfos, serverType, func(addr string, resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
return errors.Errorf("request %s failed: %s", addr, resp.Status)
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, metricName) {
continue
}
// the metrics format is like following:
// tikv_server_cpu_cores_quota 8
quota, err := strconv.ParseFloat(line[len(metricName)+1:], 64)
if err == nil {
cpuQuota = quota
}
return errors.Trace(err)
}
return errors.Errorf("metrics '%s' not found from server '%s'", metricName, addr)
})
return cpuQuota, err
}

func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, *http.Response) error) error {
var firstErr error
for _, srv := range serversInfo {
if srv.ServerType != serverType {
continue
}
if len(srv.StatusAddr) == 0 {
continue
}
url := fmt.Sprintf("%s://%s/metrics", util.InternalHTTPSchema(), srv.StatusAddr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
var resp *http.Response
failpoint.Inject("mockMetricsResponse", func(val failpoint.Value) {
if val != nil {
data, _ := base64.StdEncoding.DecodeString(val.(string))
resp = &http.Response{
StatusCode: http.StatusOK,
Body: noopCloserWrapper{
Reader: strings.NewReader(string(data)),
},
}
}
})
if resp == nil {
var err1 error
// ignore false positive go line, can't use defer here because it's in a loop.
//nolint:bodyclose
resp, err1 = util.InternalHTTPClient().Do(req)
if err1 != nil {
if firstErr == nil {
firstErr = err1
}
continue
}
}
err = onResp(srv.Address, resp)
resp.Body.Close()
return err
}
if firstErr == nil {
firstErr = errors.Errorf("no server with type '%s' is found", serverType)
}
return firstErr
}

type noopCloserWrapper struct {
io.Reader
}

func (noopCloserWrapper) Close() error {
return nil
}
Loading

0 comments on commit 5db7c6a

Please sign in to comment.