Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: a centralized place to generate tipb report data #30781

Merged
merged 6 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1536,7 +1536,7 @@ func TestTopSQLAgent(t *testing.T) {
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")

r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(plancodec.DecodeNormalizedPlan))
r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(), plancodec.DecodeNormalizedPlan)
tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r})

// TODO: change to ensure that the right sql statements are reported, not just counts
Expand Down
2 changes: 1 addition & 1 deletion util/topsql/reporter/datasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type DataSink interface {
// TrySend pushes a report data into the sink, which will later be sent to a target by the sink. A deadline can be
// specified to control how late it should be sent. If the sink is kept full and cannot schedule a send within
// the specified deadline, or the sink is closed, an error will be returned.
TrySend(data ReportData, deadline time.Time) error
TrySend(data *ReportData, deadline time.Time) error

// IsPaused indicates that the DataSink is not expecting to receive records for now
// and may resume in the future.
Expand Down
100 changes: 69 additions & 31 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/topsql/tracecpu"
"github.com/pingcap/tipb/go-tipb"
"github.com/wangjohn/quickselect"
atomic2 "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -132,6 +133,9 @@ type RemoteTopSQLReporter struct {

collectCPUDataChan chan cpuData
reportCollectedDataChan chan collectedData

// calling decodePlan this can take a while, so should not block critical paths
decodePlan planBinaryDecodeFunc
}

// SQLMeta is the SQL meta which contains the normalized SQL string and a bool field which uses to distinguish internal SQL.
Expand All @@ -144,14 +148,15 @@ type SQLMeta struct {
//
// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string
// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache
func NewRemoteTopSQLReporter(dataSink DataSink) *RemoteTopSQLReporter {
func NewRemoteTopSQLReporter(dataSink DataSink, decodePlan planBinaryDecodeFunc) *RemoteTopSQLReporter {
ctx, cancel := context.WithCancel(context.Background())
tsr := &RemoteTopSQLReporter{
ctx: ctx,
cancel: cancel,
dataSink: dataSink,
collectCPUDataChan: make(chan cpuData, 1),
reportCollectedDataChan: make(chan collectedData, 1),
decodePlan: decodePlan,
}
tsr.normalizedSQLMap.Store(&sync.Map{})
tsr.normalizedPlanMap.Store(&sync.Map{})
Expand Down Expand Up @@ -259,8 +264,8 @@ func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, t
others.CPUTimeMsTotal += uint64(totalCPUTimeMs)
}

// addEvictedIntoSortedDataPoints adds the evict dataPoints into others.
// Attention, this function depend on others dataPoints is sorted, and this function will modify the evict dataPoints
// addEvictedIntoSortedDataPoints adds evicted dataPoints into others.
// Attention, this function depend on others dataPoints is sorted, and this function will modify evicted dataPoints
// to make sure it is sorted by timestamp.
func addEvictedIntoSortedDataPoints(others *dataPoints, evict *dataPoints) *dataPoints {
if others == nil {
Expand Down Expand Up @@ -451,29 +456,14 @@ type collectedData struct {

// ReportData contains data that reporter sends to the agent
type ReportData struct {
// collectedData contains the topN collected records and the `others` record which aggregation all records that is out of Top N.
collectedData []*dataPoints
normalizedSQLMap *sync.Map
normalizedPlanMap *sync.Map
// CPUTimeRecords contains the topN collected records and the `others` record which aggregation all records that is out of Top N.
CPUTimeRecords []tipb.CPUTimeRecord
SQLMetas []tipb.SQLMeta
PlanMetas []tipb.PlanMeta
}

func (d *ReportData) hasData() bool {
if len(d.collectedData) > 0 {
return true
}
cnt := 0
d.normalizedSQLMap.Range(func(key, value interface{}) bool {
cnt++
return false
})
if cnt > 0 {
return true
}
d.normalizedPlanMap.Range(func(key, value interface{}) bool {
cnt++
return false
})
return cnt > 0
return len(d.CPUTimeRecords) != 0 || len(d.SQLMetas) != 0 || len(d.PlanMetas) != 0
}

// reportWorker sends data to the gRPC endpoint from the `reportCollectedDataChan` one by one.
Expand All @@ -497,11 +487,17 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {

// getReportData gets ReportData from the collectedData.
// This function will calculate the topN collected records and the `others` record which aggregation all records that is out of Top N.
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportData {
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) *ReportData {
records := getTopNFromCollected(collected)
return tsr.buildReportData(records, collected.normalizedSQLMap, collected.normalizedPlanMap)
}

func getTopNFromCollected(collected collectedData) (records []*dataPoints) {
// Fetch TopN dataPoints.
others := collected.records[keyOthers]
delete(collected.records, keyOthers)
records := make([]*dataPoints, 0, len(collected.records))

records = make([]*dataPoints, 0, len(collected.records))
for _, v := range collected.records {
records = append(records, v)
}
Expand All @@ -514,7 +510,7 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportDa
sort.Sort(others)
}
for _, evict := range evicted {
// SQL meta will not be evicted, since the evicted SQL can be appear on Other components (TiKV) TopN records.
// SQL meta will not be evicted, since the evicted SQL can be appeared on Other components (TiKV) TopN records.
others = addEvictedIntoSortedDataPoints(others, evict)
}

Expand All @@ -523,14 +519,56 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportDa
records = append(records, others)
}

return ReportData{
collectedData: records,
normalizedSQLMap: collected.normalizedSQLMap,
normalizedPlanMap: collected.normalizedPlanMap,
return
}

// buildReportData convert record data in dataPoints slice and meta data in sync.Map to ReportData.
//
// Attention, caller should guarantee no more reader or writer access `sqlMap` and `planMap`, because buildReportData
// will do heavy jobs in sync.Map.Range and it may block other readers and writers.
func (tsr *RemoteTopSQLReporter) buildReportData(records []*dataPoints, sqlMap *sync.Map, planMap *sync.Map) *ReportData {
res := &ReportData{
CPUTimeRecords: make([]tipb.CPUTimeRecord, 0, len(records)),
SQLMetas: make([]tipb.SQLMeta, 0, len(records)),
PlanMetas: make([]tipb.PlanMeta, 0, len(records)),
}

for _, record := range records {
res.CPUTimeRecords = append(res.CPUTimeRecords, tipb.CPUTimeRecord{
RecordListTimestampSec: record.TimestampList,
RecordListCpuTimeMs: record.CPUTimeMsList,
SqlDigest: record.SQLDigest,
PlanDigest: record.PlanDigest,
})
}

sqlMap.Range(func(key, value interface{}) bool {
meta := value.(SQLMeta)
res.SQLMetas = append(res.SQLMetas, tipb.SQLMeta{
SqlDigest: []byte(key.(string)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it by design that they are stored as binary strings?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know. It has been like that for a long while.

NormalizedSql: meta.normalizedSQL,
IsInternalSql: meta.isInternal,
})
return true
})

planMap.Range(func(key, value interface{}) bool {
planDecoded, errDecode := tsr.decodePlan(value.(string))
if errDecode != nil {
logutil.BgLogger().Warn("[top-sql] decode plan failed", zap.Error(errDecode))
return true
}
res.PlanMetas = append(res.PlanMetas, tipb.PlanMeta{
PlanDigest: []byte(key.(string)),
NormalizedPlan: planDecoded,
})
return true
})

return res
}

func (tsr *RemoteTopSQLReporter) doReport(data ReportData) {
func (tsr *RemoteTopSQLReporter) doReport(data *ReportData) {
defer util.Recover("top-sql", "doReport", nil, false)

if !data.hasData() {
Expand Down
4 changes: 2 additions & 2 deletions util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *Rem
conf.TopSQL.ReceiverAddress = addr
})

rc := NewSingleTargetDataSink(mockPlanBinaryDecoderFunc)
ts := NewRemoteTopSQLReporter(rc)
rc := NewSingleTargetDataSink()
ts := NewRemoteTopSQLReporter(rc, mockPlanBinaryDecoderFunc)
return ts
}

Expand Down
Loading