Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: fetch cpu quota metrics from store instead of prometheus #49176

Merged
merged 10 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/executor/internal/calibrateresource/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn/staleread",
"//pkg/util",
"//pkg/util/chunk",
"//pkg/util/sqlexec",
"@com_github_docker_go_units//:go-units",
Expand Down
210 changes: 178 additions & 32 deletions pkg/executor/internal/calibrateresource/calibrate_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,17 @@
package calibrateresource

import (
"bufio"
"context"
"encoding/base64"
"fmt"
"io"
"math"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
"time"

"github.com/docker/go-units"
Expand All @@ -34,6 +41,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn/staleread"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -81,6 +89,15 @@ var (
}
)

const (
// serverTypeTiDB is tidb's instance type name
serverTypeTiDB = "tidb"
// serverTypeTiKV is tikv's instance type name
serverTypeTiKV = "tikv"
// serverTypeTiFlash is tiflash's instance type name
serverTypeTiFlash = "tiflash"
)

// the resource cost rate of a specified workload per 1 tikv cpu.
type baseResourceCost struct {
// represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu
Expand Down Expand Up @@ -235,43 +252,55 @@ func (e *Executor) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}
e.done = true

exec := e.Ctx().(sqlexec.RestrictedSQLExecutor)
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers)
if len(e.OptionList) > 0 {
return e.dynamicCalibrate(ctx, req, exec)
return e.dynamicCalibrate(ctx, req)
}
return e.staticCalibrate(ctx, req, exec)
return e.staticCalibrate(req)
}

var (
errLowUsage = errors.Errorf("The workload in selected time window is too low, with which TiDB is unable to reach a capacity estimation; please select another time window with higher workload, or calibrate resource by hardware instead")
errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v")
)

func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk) error {
exec := e.Ctx().(sqlexec.RestrictedSQLExecutor)
startTs, endTs, err := e.parseCalibrateDuration(ctx)
if err != nil {
return err
}
tidbQuota, err1 := e.getTiDBQuota(ctx, exec, startTs, endTs)
tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, startTs, endTs)
clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx())
if err != nil {
return err
}
tidbQuota, err1 := e.getTiDBQuota(ctx, exec, clusterInfo, startTs, endTs)
tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, clusterInfo, startTs, endTs)
if err1 != nil && err2 != nil {
return err1
}

req.AppendUint64(0, uint64(tidbQuota+tiflashQuota))
return nil
}

func (e *Executor) getTiDBQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) {
func (e *Executor) getTiDBQuota(
ctx context.Context,
exec sqlexec.RestrictedSQLExecutor,
serverInfos []infoschema.ServerInfo,
startTs, endTs time.Time,
) (float64, error) {
startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)
endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(serverInfos)
if err != nil {
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
totalTiDBCPU, err := getTiDBTotalCPUQuota(serverInfos)
if err != nil {
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand Down Expand Up @@ -368,12 +397,17 @@ func setupQuotas(quotas []float64) (float64, error) {
return sum / float64(upperBound-lowerBound), nil
}

func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) {
func (e *Executor) getTiFlashQuota(
ctx context.Context,
exec sqlexec.RestrictedSQLExecutor,
serverInfos []infoschema.ServerInfo,
startTs, endTs time.Time,
) (float64, error) {
startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)
endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime)

quotas := make([]float64, 0)
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec)
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(serverInfos)
if err != nil {
return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand Down Expand Up @@ -407,25 +441,26 @@ func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedS
return setupQuotas(quotas)
}

func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error {
if !variable.EnableResourceControl.Load() {
return infoschema.ErrResourceGroupSupportDisabled
}
func (e *Executor) staticCalibrate(req *chunk.Chunk) error {
resourceGroupCtl := domain.GetDomain(e.Ctx()).ResourceGroupsController()
// first fetch the ru settings config.
if resourceGroupCtl == nil {
return errors.New("resource group controller is not initialized")
}
clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx())
if err != nil {
return err
}
ruCfg := resourceGroupCtl.GetConfig()
if e.WorkloadType == ast.TPCH10 {
return staticCalibrateTpch10(ctx, req, exec, ruCfg)
return staticCalibrateTpch10(req, clusterInfo, ruCfg)
}

totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec)
totalKVCPUQuota, err := getTiKVTotalCPUQuota(clusterInfo)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec)
totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(clusterInfo)
if err != nil {
return errNoCPUQuotaMetrics.FastGenByArgs(err.Error())
}
Expand All @@ -439,8 +474,8 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s
return errors.Errorf("unknown workload '%T'", e.WorkloadType)
}

if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio
if totalTiDBCPUQuota/baseCost.tidbToKVCPURatio < totalKVCPUQuota {
totalKVCPUQuota = totalTiDBCPUQuota / baseCost.tidbToKVCPURatio
}
ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) +
float64(ruCfg.CPUMsCost)*baseCost.kvCPU*1000 + // convert to ms
Expand All @@ -452,14 +487,14 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s
return nil
}

func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor, ruCfg *resourceControlClient.RUConfig) error {
func staticCalibrateTpch10(req *chunk.Chunk, clusterInfo []infoschema.ServerInfo, ruCfg *resourceControlClient.RUConfig) error {
// TPCH10 only considers the resource usage of the TiFlash including cpu and read bytes. Others are ignored.
// cpu usage: 105494.666484 / 20 / 20 = 263.74
// read bytes: 401799161689.0 / 20 / 20 = 1004497904.22
const cpuTimePerCPUPerSec float64 = 263.74
const readBytesPerCPUPerSec float64 = 1004497904.22
ruPerCPU := float64(ruCfg.CPUMsCost)*cpuTimePerCPUPerSec + float64(ruCfg.ReadBytesCost)*readBytesPerCPUPerSec
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec)
totalTiFlashLogicalCores, err := getTiFlashLogicalCores(clusterInfo)
if err != nil {
return err
}
Expand All @@ -468,19 +503,39 @@ func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.R
return nil
}

func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota")
func getTiDBTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
cpuQuota := float64(runtime.GOMAXPROCS(0))
failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) {
if val != nil {
cpuQuota = float64(val.(int))
}
})
instanceNum := count(clusterInfo, serverTypeTiDB)
return cpuQuota * float64(instanceNum), nil
}

func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs")
func getTiKVTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) {
instanceNum := count(clusterInfo, serverTypeTiKV)
if instanceNum == 0 {
return 0.0, errors.New("no server with type 'tikv' is found")
}
cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiKV, "tikv_server_cpu_cores_quota")
if err != nil {
return 0.0, err
}
return cpuQuota * float64(instanceNum), nil
}

func getTiFlashLogicalCores(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) {
query := "SELECT SUM(value) FROM METRICS_SCHEMA.tiflash_cpu_quota GROUP BY time ORDER BY time desc limit 1"
return getNumberFromMetrics(ctx, exec, query, "tiflash_cpu_quota")
func getTiFlashLogicalCores(clusterInfo []infoschema.ServerInfo) (float64, error) {
instanceNum := count(clusterInfo, serverTypeTiFlash)
if instanceNum == 0 {
return 0.0, nil
}
cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiFlash, "tiflash_proxy_tikv_server_cpu_cores_quota")
if err != nil {
return 0.0, err
}
return cpuQuota * float64(instanceNum), nil
}

func getTiFlashRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) {
Expand Down Expand Up @@ -568,3 +623,94 @@ func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sql
}
return &timeSeriesValues{idx: 0, vals: ret}, nil
}

func count(clusterInfo []infoschema.ServerInfo, ty string) int {
num := 0
for _, e := range clusterInfo {
if e.ServerType == ty {
num++
}
}
return num
}

func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) {
var cpuQuota float64
err := fetchStoreMetrics(serverInfos, serverType, func(addr string, resp *http.Response) error {
if resp.StatusCode != http.StatusOK {
return errors.Errorf("request %s failed: %s", addr, resp.Status)
}
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, metricName) {
continue
}
// the metrics format is like following:
// tikv_server_cpu_cores_quota 8
quota, err := strconv.ParseFloat(line[len(metricName)+1:], 64)
if err == nil {
cpuQuota = quota
}
return errors.Trace(err)
}
return errors.Errorf("metrics '%s' not found from server '%s'", metricName, addr)
})
return cpuQuota, err
}

func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, *http.Response) error) error {
var firstErr error
for _, srv := range serversInfo {
if srv.ServerType != serverType {
continue
}
if len(srv.StatusAddr) == 0 {
continue
}
url := fmt.Sprintf("%s://%s/metrics", util.InternalHTTPSchema(), srv.StatusAddr)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
var resp *http.Response
failpoint.Inject("mockMetricsResponse", func(val failpoint.Value) {
if val != nil {
data, _ := base64.StdEncoding.DecodeString(val.(string))
resp = &http.Response{
StatusCode: http.StatusOK,
Body: noopCloserWrapper{
Reader: strings.NewReader(string(data)),
},
}
}
})
if resp == nil {
var err1 error
// ignore false positive go line, can't use defer here because it's in a loop.
//nolint:bodyclose
resp, err1 = util.InternalHTTPClient().Do(req)
if err1 != nil {
if firstErr == nil {
firstErr = err1
}
continue
}
}
err = onResp(srv.Address, resp)
resp.Body.Close()
return err
}
if firstErr == nil {
firstErr = errors.Errorf("no server with type '%s' is found", serverType)
}
return firstErr
}

type noopCloserWrapper struct {
io.Reader
}

func (noopCloserWrapper) Close() error {
return nil
}
Loading