From 5db7c6aab54809a2e15a9387e48e125fe1dfe4ca Mon Sep 17 00:00:00 2001 From: glorv Date: Thu, 7 Dec 2023 17:26:51 +0800 Subject: [PATCH] resource_control: fetch cpu quota metrics from store instead of prometheus (#49176) close pingcap/tidb#49174 --- .../internal/calibrateresource/BUILD.bazel | 1 + .../calibrateresource/calibrate_resource.go | 210 +++++++++++++++--- .../calibrate_resource_test.go | 96 ++++---- 3 files changed, 226 insertions(+), 81 deletions(-) diff --git a/pkg/executor/internal/calibrateresource/BUILD.bazel b/pkg/executor/internal/calibrateresource/BUILD.bazel index 579adff6fceab..c7ecb061ff33d 100644 --- a/pkg/executor/internal/calibrateresource/BUILD.bazel +++ b/pkg/executor/internal/calibrateresource/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/sessiontxn/staleread", + "//pkg/util", "//pkg/util/chunk", "//pkg/util/sqlexec", "@com_github_docker_go_units//:go-units", diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource.go b/pkg/executor/internal/calibrateresource/calibrate_resource.go index 1c8ddcd76fa82..0d3a34c60f0ee 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource.go @@ -15,10 +15,17 @@ package calibrateresource import ( + "bufio" "context" + "encoding/base64" "fmt" + "io" "math" + "net/http" + "runtime" "sort" + "strconv" + "strings" "time" "github.com/docker/go-units" @@ -34,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn/staleread" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/tikv/client-go/v2/oracle" @@ -81,6 +89,15 @@ var ( } ) +const ( + // serverTypeTiDB is tidb's instance type name + serverTypeTiDB = "tidb" + // serverTypeTiKV is tikv's instance type name + serverTypeTiKV = "tikv" + // serverTypeTiFlash is tiflash's instance type name + serverTypeTiFlash = "tiflash" +) + // the resource cost rate of a specified workload per 1 tikv cpu. type baseResourceCost struct { // represents the average ratio of TiDB CPU time to TiKV CPU time, this is used to calculate whether tikv cpu @@ -235,13 +252,14 @@ func (e *Executor) Next(ctx context.Context, req *chunk.Chunk) error { return nil } e.done = true - - exec := e.Ctx().(sqlexec.RestrictedSQLExecutor) + if !variable.EnableResourceControl.Load() { + return infoschema.ErrResourceGroupSupportDisabled + } ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers) if len(e.OptionList) > 0 { - return e.dynamicCalibrate(ctx, req, exec) + return e.dynamicCalibrate(ctx, req) } - return e.staticCalibrate(ctx, req, exec) + return e.staticCalibrate(req) } var ( @@ -249,29 +267,40 @@ var ( errNoCPUQuotaMetrics = errors.Normalize("There is no CPU quota metrics, %v") ) -func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { +func (e *Executor) dynamicCalibrate(ctx context.Context, req *chunk.Chunk) error { + exec := e.Ctx().(sqlexec.RestrictedSQLExecutor) startTs, endTs, err := e.parseCalibrateDuration(ctx) if err != nil { return err } - tidbQuota, err1 := e.getTiDBQuota(ctx, exec, startTs, endTs) - tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, startTs, endTs) + clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx()) + if err != nil { + return err + } + tidbQuota, err1 := e.getTiDBQuota(ctx, exec, clusterInfo, startTs, endTs) + tiflashQuota, err2 := e.getTiFlashQuota(ctx, exec, clusterInfo, startTs, endTs) if err1 != nil && err2 != nil { return err1 } + req.AppendUint64(0, uint64(tidbQuota+tiflashQuota)) return nil } -func (e *Executor) getTiDBQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) { +func (e *Executor) getTiDBQuota( + ctx context.Context, + exec sqlexec.RestrictedSQLExecutor, + serverInfos []infoschema.ServerInfo, + startTs, endTs time.Time, +) (float64, error) { startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) + totalKVCPUQuota, err := getTiKVTotalCPUQuota(serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) + totalTiDBCPU, err := getTiDBTotalCPUQuota(serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -368,12 +397,17 @@ func setupQuotas(quotas []float64) (float64, error) { return sum / float64(upperBound-lowerBound), nil } -func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, startTs, endTs time.Time) (float64, error) { +func (e *Executor) getTiFlashQuota( + ctx context.Context, + exec sqlexec.RestrictedSQLExecutor, + serverInfos []infoschema.ServerInfo, + startTs, endTs time.Time, +) (float64, error) { startTime := startTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) endTime := endTs.In(e.Ctx().GetSessionVars().Location()).Format(time.DateTime) quotas := make([]float64, 0) - totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec) + totalTiFlashLogicalCores, err := getTiFlashLogicalCores(serverInfos) if err != nil { return 0, errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -407,25 +441,26 @@ func (e *Executor) getTiFlashQuota(ctx context.Context, exec sqlexec.RestrictedS return setupQuotas(quotas) } -func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor) error { - if !variable.EnableResourceControl.Load() { - return infoschema.ErrResourceGroupSupportDisabled - } +func (e *Executor) staticCalibrate(req *chunk.Chunk) error { resourceGroupCtl := domain.GetDomain(e.Ctx()).ResourceGroupsController() // first fetch the ru settings config. if resourceGroupCtl == nil { return errors.New("resource group controller is not initialized") } + clusterInfo, err := infoschema.GetClusterServerInfo(e.Ctx()) + if err != nil { + return err + } ruCfg := resourceGroupCtl.GetConfig() if e.WorkloadType == ast.TPCH10 { - return staticCalibrateTpch10(ctx, req, exec, ruCfg) + return staticCalibrateTpch10(req, clusterInfo, ruCfg) } - totalKVCPUQuota, err := getTiKVTotalCPUQuota(ctx, exec) + totalKVCPUQuota, err := getTiKVTotalCPUQuota(clusterInfo) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } - totalTiDBCPU, err := getTiDBTotalCPUQuota(ctx, exec) + totalTiDBCPUQuota, err := getTiDBTotalCPUQuota(clusterInfo) if err != nil { return errNoCPUQuotaMetrics.FastGenByArgs(err.Error()) } @@ -439,8 +474,8 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s return errors.Errorf("unknown workload '%T'", e.WorkloadType) } - if totalTiDBCPU/baseCost.tidbToKVCPURatio < totalKVCPUQuota { - totalKVCPUQuota = totalTiDBCPU / baseCost.tidbToKVCPURatio + if totalTiDBCPUQuota/baseCost.tidbToKVCPURatio < totalKVCPUQuota { + totalKVCPUQuota = totalTiDBCPUQuota / baseCost.tidbToKVCPURatio } ruPerKVCPU := float64(ruCfg.ReadBaseCost)*float64(baseCost.readReqCount) + float64(ruCfg.CPUMsCost)*baseCost.kvCPU*1000 + // convert to ms @@ -452,14 +487,14 @@ func (e *Executor) staticCalibrate(ctx context.Context, req *chunk.Chunk, exec s return nil } -func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.RestrictedSQLExecutor, ruCfg *resourceControlClient.RUConfig) error { +func staticCalibrateTpch10(req *chunk.Chunk, clusterInfo []infoschema.ServerInfo, ruCfg *resourceControlClient.RUConfig) error { // TPCH10 only considers the resource usage of the TiFlash including cpu and read bytes. Others are ignored. // cpu usage: 105494.666484 / 20 / 20 = 263.74 // read bytes: 401799161689.0 / 20 / 20 = 1004497904.22 const cpuTimePerCPUPerSec float64 = 263.74 const readBytesPerCPUPerSec float64 = 1004497904.22 ruPerCPU := float64(ruCfg.CPUMsCost)*cpuTimePerCPUPerSec + float64(ruCfg.ReadBytesCost)*readBytesPerCPUPerSec - totalTiFlashLogicalCores, err := getTiFlashLogicalCores(ctx, exec) + totalTiFlashLogicalCores, err := getTiFlashLogicalCores(clusterInfo) if err != nil { return err } @@ -468,19 +503,39 @@ func staticCalibrateTpch10(ctx context.Context, req *chunk.Chunk, exec sqlexec.R return nil } -func getTiKVTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tikv_cpu_quota GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tikv_cpu_quota") +func getTiDBTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) { + cpuQuota := float64(runtime.GOMAXPROCS(0)) + failpoint.Inject("mockGOMAXPROCS", func(val failpoint.Value) { + if val != nil { + cpuQuota = float64(val.(int)) + } + }) + instanceNum := count(clusterInfo, serverTypeTiDB) + return cpuQuota * float64(instanceNum), nil } -func getTiDBTotalCPUQuota(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tidb_server_maxprocs GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tidb_server_maxprocs") +func getTiKVTotalCPUQuota(clusterInfo []infoschema.ServerInfo) (float64, error) { + instanceNum := count(clusterInfo, serverTypeTiKV) + if instanceNum == 0 { + return 0.0, errors.New("no server with type 'tikv' is found") + } + cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiKV, "tikv_server_cpu_cores_quota") + if err != nil { + return 0.0, err + } + return cpuQuota * float64(instanceNum), nil } -func getTiFlashLogicalCores(ctx context.Context, exec sqlexec.RestrictedSQLExecutor) (float64, error) { - query := "SELECT SUM(value) FROM METRICS_SCHEMA.tiflash_cpu_quota GROUP BY time ORDER BY time desc limit 1" - return getNumberFromMetrics(ctx, exec, query, "tiflash_cpu_quota") +func getTiFlashLogicalCores(clusterInfo []infoschema.ServerInfo) (float64, error) { + instanceNum := count(clusterInfo, serverTypeTiFlash) + if instanceNum == 0 { + return 0.0, nil + } + cpuQuota, err := fetchServerCPUQuota(clusterInfo, serverTypeTiFlash, "tiflash_proxy_tikv_server_cpu_cores_quota") + if err != nil { + return 0.0, err + } + return cpuQuota * float64(instanceNum), nil } func getTiFlashRUPerSec(ctx context.Context, sctx sessionctx.Context, exec sqlexec.RestrictedSQLExecutor, startTime, endTime string) (*timeSeriesValues, error) { @@ -568,3 +623,94 @@ func getValuesFromMetrics(ctx context.Context, sctx sessionctx.Context, exec sql } return &timeSeriesValues{idx: 0, vals: ret}, nil } + +func count(clusterInfo []infoschema.ServerInfo, ty string) int { + num := 0 + for _, e := range clusterInfo { + if e.ServerType == ty { + num++ + } + } + return num +} + +func fetchServerCPUQuota(serverInfos []infoschema.ServerInfo, serverType string, metricName string) (float64, error) { + var cpuQuota float64 + err := fetchStoreMetrics(serverInfos, serverType, func(addr string, resp *http.Response) error { + if resp.StatusCode != http.StatusOK { + return errors.Errorf("request %s failed: %s", addr, resp.Status) + } + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, metricName) { + continue + } + // the metrics format is like following: + // tikv_server_cpu_cores_quota 8 + quota, err := strconv.ParseFloat(line[len(metricName)+1:], 64) + if err == nil { + cpuQuota = quota + } + return errors.Trace(err) + } + return errors.Errorf("metrics '%s' not found from server '%s'", metricName, addr) + }) + return cpuQuota, err +} + +func fetchStoreMetrics(serversInfo []infoschema.ServerInfo, serverType string, onResp func(string, *http.Response) error) error { + var firstErr error + for _, srv := range serversInfo { + if srv.ServerType != serverType { + continue + } + if len(srv.StatusAddr) == 0 { + continue + } + url := fmt.Sprintf("%s://%s/metrics", util.InternalHTTPSchema(), srv.StatusAddr) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return err + } + var resp *http.Response + failpoint.Inject("mockMetricsResponse", func(val failpoint.Value) { + if val != nil { + data, _ := base64.StdEncoding.DecodeString(val.(string)) + resp = &http.Response{ + StatusCode: http.StatusOK, + Body: noopCloserWrapper{ + Reader: strings.NewReader(string(data)), + }, + } + } + }) + if resp == nil { + var err1 error + // ignore false positive go line, can't use defer here because it's in a loop. + //nolint:bodyclose + resp, err1 = util.InternalHTTPClient().Do(req) + if err1 != nil { + if firstErr == nil { + firstErr = err1 + } + continue + } + } + err = onResp(srv.Address, resp) + resp.Body.Close() + return err + } + if firstErr == nil { + firstErr = errors.Errorf("no server with type '%s' is found", serverType) + } + return firstErr +} + +type noopCloserWrapper struct { + io.Reader +} + +func (noopCloserWrapper) Close() error { + return nil +} diff --git a/pkg/executor/internal/calibrateresource/calibrate_resource_test.go b/pkg/executor/internal/calibrateresource/calibrate_resource_test.go index 6fb9467454c7e..c0030b004ca08 100644 --- a/pkg/executor/internal/calibrateresource/calibrate_resource_test.go +++ b/pkg/executor/internal/calibrateresource/calibrate_resource_test.go @@ -17,7 +17,9 @@ package calibrateresource_test import ( "bytes" "context" + "encoding/base64" "encoding/json" + "strings" "testing" "time" @@ -73,12 +75,27 @@ func TestCalibrateResource(t *testing.T) { require.NoError(t, err) require.NotNil(t, rs) err = rs.Next(context.Background(), rs.NewChunk(nil)) - require.ErrorContains(t, err, "query metric error: pd unavailable") + require.ErrorContains(t, err, "no server with type 'tikv' is found") // error sql _, err = tk.Exec("CALIBRATE RESOURCE WORKLOAD tpcc START_TIME '2020-02-12 10:35:00'") require.Error(t, err) + // Mock for cluster info + // information_schema.cluster_config + instances := []string{ + "pd,127.0.0.1:32379,127.0.0.1:32380,mock-version,mock-githash,0", + "tidb,127.0.0.1:34000,30080,mock-version,mock-githash,1001", + "tikv,127.0.0.1:30160,30180,mock-version,mock-githash,0", + "tikv,127.0.0.1:30161,30181,mock-version,mock-githash,0", + "tikv,127.0.0.1:30162,30182,mock-version,mock-githash,0", + } + fpExpr := `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo", fpExpr)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo")) + }() + // Mock for metric table data. fpName := "github.com/pingcap/tidb/pkg/executor/mockMetricsTableData" require.NoError(t, failpoint.Enable(fpName, "return")) @@ -99,30 +116,34 @@ func TestCalibrateResource(t *testing.T) { return time } + metricsData := `# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds. +# TYPE process_cpu_seconds_total counter +process_cpu_seconds_total 49943 +# HELP tikv_server_cpu_cores_quota Total CPU cores quota for TiKV server +# TYPE tikv_server_cpu_cores_quota gauge +tikv_server_cpu_cores_quota 8 +# HELP tiflash_proxy_tikv_scheduler_write_flow The write flow passed through at scheduler level. +# TYPE tiflash_proxy_tikv_scheduler_write_flow gauge +tiflash_proxy_tikv_scheduler_write_flow 0 +# HELP tiflash_proxy_tikv_server_cpu_cores_quota Total CPU cores quota for TiKV server +# TYPE tiflash_proxy_tikv_server_cpu_cores_quota gauge +tiflash_proxy_tikv_server_cpu_cores_quota 20 +` + // failpoint doesn't support string contains whitespaces and newline + encodedData := base64.StdEncoding.EncodeToString([]byte(metricsData)) + fpExpr = `return("` + encodedData + `")` + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockMetricsResponse", fpExpr)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockGOMAXPROCS", "return(40)")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockGOMAXPROCS")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockMetricsResponse")) + }() mockData := make(map[string][][]types.Datum) ctx := context.WithValue(context.Background(), "__mockMetricsTableData", mockData) ctx = failpoint.WithHook(ctx, func(_ context.Context, fpname string) bool { return fpName == fpname }) - rs, err = tk.Exec("CALIBRATE RESOURCE") - require.NoError(t, err) - require.NotNil(t, rs) - err = rs.Next(ctx, rs.NewChunk(nil)) - // because when mock metrics is empty, error is always `pd unavailable`, don't check detail. - require.ErrorContains(t, err, "There is no CPU quota metrics, query metric error: pd unavailable") - - mockData["tikv_cpu_quota"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-0", 8.0), - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-1", 8.0), - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tikv-2", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-0", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-1", 8.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tikv-2", 8.0), - } - mockData["tidb_server_maxprocs"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 40.0), - types.MakeDatums(datetime("2020-02-12 10:36:00"), "tidb-0", 40.0), - } + tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("69768")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD TPCC").Check(testkit.Rows("69768")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_READ_WRITE").Check(testkit.Rows("55823")) @@ -130,9 +151,7 @@ func TestCalibrateResource(t *testing.T) { tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE WORKLOAD OLTP_WRITE_ONLY").Check(testkit.Rows("109776")) // change total tidb cpu to less than tikv_cpu_quota - mockData["tidb_server_maxprocs"] = [][]types.Datum{ - types.MakeDatums(datetime("2020-02-12 10:35:00"), "tidb-0", 8.0), - } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/internal/calibrateresource/mockGOMAXPROCS", "return(8)")) tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE").Check(testkit.Rows("38760")) ru1 := [][]types.Datum{ @@ -707,19 +726,11 @@ func TestCalibrateResource(t *testing.T) { types.MakeDatums(datetime("2023-09-19 20:00:39.329000"), "127.0.0.1:10080", 20.0), } - mockData["tikv_cpu_quota"] = [][]types.Datum{ - types.MakeDatums(datetime("2023-09-19 19:50:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:51:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:52:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:53:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:54:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:55:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:56:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:57:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:58:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 19:59:39.330000"), "127.0.0.1:20180", 20.0), - types.MakeDatums(datetime("2023-09-19 20:00:39.330000"), "127.0.0.1:20180", 20.0), - } + // change mock for cluster info, add tiflash + instances = append(instances, "tiflash,127.0.0.1:3930,33940,mock-version,mock-githash,0") + fpExpr = `return("` + strings.Join(instances, ";") + `")` + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo", fpExpr)) + rs, err = tk.Exec("CALIBRATE RESOURCE START_TIME '2023-09-19 19:50:39' DURATION '10m'") require.NoError(t, err) require.NotNil(t, rs) @@ -754,19 +765,6 @@ func TestCalibrateResource(t *testing.T) { types.MakeDatums(datetime("2023-09-19 20:00:39.318000"), 659167.0340155548), } - mockData["tiflash_cpu_quota"] = [][]types.Datum{ - types.MakeDatums(datetime("2023-09-19 19:50:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:51:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:52:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:53:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:54:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:55:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:56:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:57:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:58:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 19:59:39.502000"), "127.0.0.1:8234 ", 20.0), - types.MakeDatums(datetime("2023-09-19 20:00:39.502000"), "127.0.0.1:8234 ", 20.0), - } tk.MustQueryWithContext(ctx, "CALIBRATE RESOURCE START_TIME '2023-09-19 19:50:39' DURATION '10m'").Check(testkit.Rows("729439")) delete(mockData, "process_cpu_usage")