Skip to content

Commit

Permalink
*: add global sort related metric (pingcap#47485)
Browse files Browse the repository at this point in the history
  • Loading branch information
wjhuang2016 authored and ti-chi-bot committed Oct 17, 2023
1 parent ab4c20b commit f538e0e
Show file tree
Hide file tree
Showing 12 changed files with 770 additions and 4 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//br/pkg/membuf",
"//br/pkg/storage",
"//pkg/kv",
"//pkg/metrics",
"//pkg/sessionctx/variable",
"//pkg/util/hack",
"//pkg/util/logutil",
Expand Down
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/external/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -197,12 +198,13 @@ func (e *Engine) loadIngestData(
hex.EncodeToString(start))
}

now := time.Now()
startTs := time.Now()
keys := make([][]byte, 0, 1024)
values := make([][]byte, 0, 1024)
memBuf := e.bufPool.NewBuffer()
cnt := 0
size := 0
totalSize := 0
largeRegion := e.regionSplitSize > 2*int64(config.SplitRegionSize)
ret := make([]common.DataAndRange, 0, 1)
curStart := start
Expand All @@ -215,6 +217,7 @@ func (e *Engine) loadIngestData(
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
totalSize += len(k) + len(v)
}

for iter.Next() {
Expand All @@ -241,13 +244,16 @@ func (e *Engine) loadIngestData(
values = append(values, memBuf.AddBytes(v))
cnt++
size += len(k) + len(v)
totalSize += len(k) + len(v)
}
if iter.Error() != nil {
return nil, errors.Trace(iter.Error())
}

metrics.GlobalSortReadFromCloudStorageRate.WithLabelValues("read_and_sort").Observe(float64(totalSize) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
metrics.GlobalSortReadFromCloudStorageDuration.WithLabelValues("read_and_sort").Observe(time.Since(startTs).Seconds())
logutil.Logger(ctx).Info("load data from external storage",
zap.Duration("cost time", time.Since(now)),
zap.Duration("cost time", time.Since(startTs)),
zap.Int("iterated count", cnt))
ret = append(ret, common.DataAndRange{
Data: e.buildIngestData(keys, values, memBuf),
Expand Down
14 changes: 13 additions & 1 deletion br/pkg/lightning/backend/external/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/membuf"
"github.com/pingcap/tidb/br/pkg/storage"
tidbkv "github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/size"
Expand Down Expand Up @@ -385,7 +386,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
}

ts := time.Now()
var savedBytes uint64
savedBytes := w.batchSize

startTs := time.Now()
var startTsForWrite time.Time

defer func() {
w.currentSeq++
Expand All @@ -407,6 +411,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
zap.Duration("time", time.Since(ts)),
zap.Uint64("bytes", savedBytes),
zap.Any("rate", float64(savedBytes)/1024.0/1024.0/time.Since(ts).Seconds()))
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("write").Observe(time.Since(startTsForWrite).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTsForWrite).Seconds())
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort_and_write").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort_and_write").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())
}()

sorty.MaxGor = min(8, uint64(variable.GetDDLReorgWorkerCounter()))
Expand All @@ -420,6 +428,10 @@ func (w *Writer) flushKVs(ctx context.Context, fromClose bool) (err error) {
return false
})

startTsForWrite = time.Now()
metrics.GlobalSortWriteToCloudStorageDuration.WithLabelValues("sort").Observe(time.Since(startTs).Seconds())
metrics.GlobalSortWriteToCloudStorageRate.WithLabelValues("sort").Observe(float64(savedBytes) / 1024.0 / 1024.0 / time.Since(startTs).Seconds())

w.kvStore, err = NewKeyValueStore(ctx, dataWriter, w.rc)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//pkg/distsql",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/metrics",
"//pkg/parser/model",
"//pkg/parser/mysql",
"//pkg/sessionctx/variable",
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/store/pdtypes"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -1322,6 +1323,7 @@ func (local *Backend) startWorker(
jobInCh, jobOutCh chan *regionJob,
jobWg *sync.WaitGroup,
) error {
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Set(0)
for {
select {
case <-ctx.Done():
Expand All @@ -1333,7 +1335,9 @@ func (local *Backend) startWorker(
return nil
}

metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Inc()
err := local.executeJob(ctx, job)
metrics.GlobalSortIngestWorkerCnt.WithLabelValues("execute job").Dec()
switch job.stage {
case regionScanned, wrote, ingested:
jobOutCh <- job
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
util2 "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down Expand Up @@ -442,7 +443,7 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
for !done {
srcChk := w.getChunk()
done, err = fetchTableScanResult(w.ctx, w.copCtx.GetBase(), rs, srcChk)
if err != nil {
if err != nil || util2.IsContextDone(w.ctx) {
w.recycleChunk(srcChk)
terror.Call(rs.Close)
return err
Expand Down
5 changes: 5 additions & 0 deletions pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/hex"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -146,6 +147,7 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
p.checkpointMgr.Register(task.id, task.endKey)
}
var done bool
startTime := time.Now()
for !done {
srcChk := p.getChunk()
done, err = fetchTableScanResult(p.ctx, p.copCtx.GetBase(), rs, srcChk)
Expand All @@ -158,10 +160,13 @@ func scanRecords(p *copReqSenderPool, task *reorgBackfillTask, se *sess.Session)
p.checkpointMgr.UpdateTotal(task.id, srcChk.NumRows(), done)
}
idxRs := IndexRecordChunk{ID: task.id, Chunk: srcChk, Done: done}
rate := float64(srcChk.MemoryUsage()) / 1024.0 / 1024.0 / time.Since(startTime).Seconds()
metrics.AddIndexScanRate.WithLabelValues(metrics.LblAddIndex).Observe(rate)
failpoint.Inject("mockCopSenderError", func() {
idxRs.Err = errors.New("mock cop error")
})
p.chunkSender.AddTask(idxRs)
startTime = time.Now()
}
terror.Call(rs.Close)
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"domain.go",
"executor.go",
"gc_worker.go",
"globalsort.go",
"import.go",
"log_backup.go",
"meta.go",
Expand Down
9 changes: 9 additions & 0 deletions pkg/metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ var (
BackfillProgressGauge *prometheus.GaugeVec
DDLJobTableDuration *prometheus.HistogramVec
DDLRunningJobCount *prometheus.GaugeVec
AddIndexScanRate *prometheus.HistogramVec
)

// InitDDLMetrics initializes defines DDL metrics.
Expand Down Expand Up @@ -165,6 +166,14 @@ func InitDDLMetrics() {
Name: "running_job_count",
Help: "Running DDL jobs count",
}, []string{LblType})

AddIndexScanRate = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "ddl",
Name: "scan_rate",
Help: "scan rate",
Buckets: prometheus.ExponentialBuckets(0.05, 2, 20),
}, []string{LblType})
}

// Label constants.
Expand Down
72 changes: 72 additions & 0 deletions pkg/metrics/globalsort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import "github.com/prometheus/client_golang/prometheus"

var (
// GlobalSortWriteToCloudStorageDuration records the duration of writing to cloud storage.
GlobalSortWriteToCloudStorageDuration *prometheus.HistogramVec
// GlobalSortWriteToCloudStorageRate records the rate of writing to cloud storage.
GlobalSortWriteToCloudStorageRate *prometheus.HistogramVec
// GlobalSortReadFromCloudStorageDuration records the duration of reading from cloud storage.
GlobalSortReadFromCloudStorageDuration *prometheus.HistogramVec
// GlobalSortReadFromCloudStorageRate records the rate of reading from cloud storage.
GlobalSortReadFromCloudStorageRate *prometheus.HistogramVec
// GlobalSortIngestWorkerCnt records the working number of ingest workers.
GlobalSortIngestWorkerCnt *prometheus.GaugeVec
)

// InitGlobalSortMetrics initializes defines global sort metrics.
func InitGlobalSortMetrics() {
GlobalSortWriteToCloudStorageDuration = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "write_to_cloud_storage_duration",
Help: "write to cloud storage duration",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 524s
}, []string{LblType})

GlobalSortWriteToCloudStorageRate = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "write_to_cloud_storage_rate",
Help: "write to cloud storage rate",
Buckets: prometheus.ExponentialBuckets(0.05, 2, 20),
}, []string{LblType})

GlobalSortReadFromCloudStorageDuration = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "read_from_cloud_storage_duration",
Help: "read from cloud storage duration",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
}, []string{LblType})

GlobalSortReadFromCloudStorageRate = NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "read_from_cloud_storage_rate",
Help: "read from cloud storage rate",
Buckets: prometheus.ExponentialBuckets(0.05, 2, 20),
}, []string{LblType})

GlobalSortIngestWorkerCnt = NewGaugeVec(prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "global_sort",
Name: "ingest_worker_cnt",
Help: "ingest worker cnt",
}, []string{LblType})
}
Loading

0 comments on commit f538e0e

Please sign in to comment.