Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: introduce bernoulli sampling method #28999

Merged
merged 19 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 34 additions & 36 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
// decodeSampleDataWithVirtualColumn constructs the virtual column by evaluating from the deocded normal columns.
// If it failed, it would return false to trigger normal decoding way without the virtual column.
func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
collector *statistics.ReservoirRowSampleCollector,
collector statistics.RowSampleCollector,
fieldTps []*types.FieldType,
virtualColIdx []int,
schema *expression.Schema,
Expand All @@ -779,9 +779,9 @@ func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
for _, col := range e.schemaForVirtualColEval.Columns {
totFts = append(totFts, col.RetType)
}
chk := chunk.NewChunkWithCapacity(totFts, len(collector.Samples))
chk := chunk.NewChunkWithCapacity(totFts, len(collector.Base().Samples))
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location())
for _, sample := range collector.Samples {
for _, sample := range collector.Base().Samples {
for i := range sample.Columns {
if schema.Columns[i].VirtualExpr != nil {
continue
Expand All @@ -799,7 +799,7 @@ func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
iter := chunk.NewIterator4Chunk(chk)
for row, i := iter.Begin(), 0; row != iter.End(); row, i = iter.Next(), i+1 {
datums := row.GetDatumRow(totFts)
collector.Samples[i].Columns = datums
collector.Base().Samples[i].Columns = datums
}
return nil
}
Expand Down Expand Up @@ -842,9 +842,9 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}()

l := len(e.analyzePB.ColReq.ColumnsInfo) + len(e.analyzePB.ColReq.ColumnGroups)
rootRowCollector := statistics.NewReservoirRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), l)
rootRowCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
for i := 0; i < l; i++ {
rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, statistics.NewFMSketch(maxSketchSize))
rootRowCollector.Base().FMSketches = append(rootRowCollector.Base().FMSketches, statistics.NewFMSketch(maxSketchSize))
}
sc := e.ctx.GetSessionVars().StmtCtx
statsConcurrency, err := getBuildStatsConcurrency(e.ctx)
Expand Down Expand Up @@ -894,7 +894,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}
} else {
// If there's no virtual column or we meet error during eval virtual column, we fallback to normal decode otherwise.
for _, sample := range rootRowCollector.Samples {
for _, sample := range rootRowCollector.Base().Samples {
for i := range sample.Columns {
sample.Columns[i], err = tablecodec.DecodeColumnValue(sample.Columns[i].GetBytes(), &e.colsInfo[i].FieldType, sc.TimeZone)
if err != nil {
Expand All @@ -904,7 +904,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}
}

for _, sample := range rootRowCollector.Samples {
for _, sample := range rootRowCollector.Base().Samples {
// Calculate handle from the row data for each row. It will be used to sort the samples.
sample.Handle, err = e.handleCols.BuildHandleByDatums(sample.Columns)
if err != nil {
Expand All @@ -916,8 +916,8 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(

// The order of the samples are broken when merging samples from sub-collectors.
// So now we need to sort the samples according to the handle in order to calculate correlation.
sort.Slice(rootRowCollector.Samples, func(i, j int) bool {
return rootRowCollector.Samples[i].Handle.Compare(rootRowCollector.Samples[j].Handle) < 0
sort.Slice(rootRowCollector.Base().Samples, func(i, j int) bool {
return rootRowCollector.Base().Samples[i].Handle.Compare(rootRowCollector.Base().Samples[j].Handle) < 0
})

totalLen := len(e.colsInfo) + len(e.indexes)
Expand All @@ -941,7 +941,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
isColumn: true,
slicePos: i,
}
fmSketches = append(fmSketches, rootRowCollector.FMSketches[i])
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[i])
}

indexPushedDownResult := <-idxNDVPushDownCh
Expand All @@ -950,8 +950,8 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}
for _, offset := range indexesWithVirtualColOffsets {
ret := indexPushedDownResult.results[e.indexes[offset].ID]
rootRowCollector.NullCount[colLen+offset] = ret.Count
rootRowCollector.FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
rootRowCollector.Base().NullCount[colLen+offset] = ret.Count
rootRowCollector.Base().FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
}

// build index stats
Expand All @@ -963,7 +963,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
isColumn: false,
slicePos: colLen + i,
}
fmSketches = append(fmSketches, rootRowCollector.FMSketches[colLen+i])
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[colLen+i])
}
close(buildTaskChan)
panicCnt := 0
Expand All @@ -983,7 +983,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
if err != nil {
return 0, nil, nil, nil, nil, err
}
count = rootRowCollector.Count
count = rootRowCollector.Base().Count
if needExtStats {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
extStats, err = statsHandle.BuildExtendedStats(e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors)
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func (e *AnalyzeColumnsExec) buildSubIndexJobForSpecialIndex(indexInfos []*model
}

type samplingMergeResult struct {
collector *statistics.ReservoirRowSampleCollector
collector statistics.RowSampleCollector
err error
}

Expand Down Expand Up @@ -1190,9 +1190,9 @@ func (e *AnalyzeColumnsExec) subMergeWorker(resultCh chan<- *samplingMergeResult
failpoint.Inject("mockAnalyzeSamplingMergeWorkerPanic", func() {
panic("failpoint triggered")
})
retCollector := statistics.NewReservoirRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), l)
retCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
for i := 0; i < l; i++ {
retCollector.FMSketches = append(retCollector.FMSketches, statistics.NewFMSketch(maxSketchSize))
retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(maxSketchSize))
}
for {
data, ok := <-taskCh
Expand All @@ -1205,19 +1205,17 @@ func (e *AnalyzeColumnsExec) subMergeWorker(resultCh chan<- *samplingMergeResult
resultCh <- &samplingMergeResult{err: err}
return
}
subCollector := &statistics.ReservoirRowSampleCollector{
MaxSampleSize: int(e.analyzePB.ColReq.SampleSize),
}
subCollector.FromProto(colResp.RowCollector)
e.job.Update(subCollector.Count)
subCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), e.analyzePB.ColReq.GetSampleRate(), l)
subCollector.Base().FromProto(colResp.RowCollector)
e.job.Update(subCollector.Base().Count)
retCollector.MergeCollector(subCollector)
}
resultCh <- &samplingMergeResult{collector: retCollector}
}

type samplingBuildTask struct {
id int64
rootRowCollector *statistics.ReservoirRowSampleCollector
rootRowCollector statistics.RowSampleCollector
tp *types.FieldType
isColumn bool
slicePos int
Expand Down Expand Up @@ -1256,8 +1254,8 @@ workLoop:
topns[task.slicePos] = nil
continue
}
sampleItems := make([]*statistics.SampleItem, 0, task.rootRowCollector.MaxSampleSize)
for j, row := range task.rootRowCollector.Samples {
sampleItems := make([]*statistics.SampleItem, 0, task.rootRowCollector.Base().Samples.Len())
for j, row := range task.rootRowCollector.Base().Samples {
if row.Columns[task.slicePos].IsNull() {
continue
}
Expand All @@ -1276,17 +1274,17 @@ workLoop:
}
collector = &statistics.SampleCollector{
Samples: sampleItems,
NullCount: task.rootRowCollector.NullCount[task.slicePos],
Count: task.rootRowCollector.Count - task.rootRowCollector.NullCount[task.slicePos],
FMSketch: task.rootRowCollector.FMSketches[task.slicePos],
TotalSize: task.rootRowCollector.TotalSizes[task.slicePos],
NullCount: task.rootRowCollector.Base().NullCount[task.slicePos],
Count: task.rootRowCollector.Base().Count - task.rootRowCollector.Base().NullCount[task.slicePos],
FMSketch: task.rootRowCollector.Base().FMSketches[task.slicePos],
TotalSize: task.rootRowCollector.Base().TotalSizes[task.slicePos],
}
} else {
var tmpDatum types.Datum
var err error
idx := e.indexes[task.slicePos-colLen]
sampleItems := make([]*statistics.SampleItem, 0, task.rootRowCollector.MaxSampleSize)
for _, row := range task.rootRowCollector.Samples {
sampleItems := make([]*statistics.SampleItem, 0, task.rootRowCollector.Base().Samples.Len())
for _, row := range task.rootRowCollector.Base().Samples {
if len(idx.Columns) == 1 && row.Columns[idx.Columns[0].Offset].IsNull() {
continue
}
Expand Down Expand Up @@ -1315,10 +1313,10 @@ workLoop:
}
collector = &statistics.SampleCollector{
Samples: sampleItems,
NullCount: task.rootRowCollector.NullCount[task.slicePos],
Count: task.rootRowCollector.Count - task.rootRowCollector.NullCount[task.slicePos],
FMSketch: task.rootRowCollector.FMSketches[task.slicePos],
TotalSize: task.rootRowCollector.TotalSizes[task.slicePos],
NullCount: task.rootRowCollector.Base().NullCount[task.slicePos],
Count: task.rootRowCollector.Base().Count - task.rootRowCollector.Base().NullCount[task.slicePos],
FMSketch: task.rootRowCollector.Base().FMSketches[task.slicePos],
TotalSize: task.rootRowCollector.Base().TotalSizes[task.slicePos],
}
}
if task.isColumn {
Expand Down
32 changes: 32 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"bytes"
"context"
"math"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -2216,9 +2217,17 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeC
baseCount: count,
baseModifyCnt: modifyCount,
}
sampleRate := new(float64)
if opts[ast.AnalyzeOptNumSamples] == 0 {
*sampleRate = math.Float64frombits(opts[ast.AnalyzeOptSampleRate])
if *sampleRate < 0 {
*sampleRate = b.getAdjustedSampleRate(b.ctx, task.TableID.GetStatisticsID(), task.TblInfo)
}
}
e.analyzePB.ColReq = &tipb.AnalyzeColumnsReq{
BucketSize: int64(opts[ast.AnalyzeOptNumBuckets]),
SampleSize: int64(opts[ast.AnalyzeOptNumSamples]),
SampleRate: sampleRate,
SketchSize: maxSketchSize,
ColumnsInfo: util.ColumnsToProto(task.ColsInfo, task.TblInfo.PKIsHandle),
ColumnGroups: colGroups,
Expand All @@ -2233,6 +2242,29 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeC
return &analyzeTask{taskType: colTask, colExec: e, job: job}
}

func (b *executorBuilder) getAdjustedSampleRate(sctx sessionctx.Context, tid int64, tblInfo *model.TableInfo) float64 {
statsHandle := domain.GetDomain(sctx).StatsHandle()
defaultRate := 0.001
if statsHandle == nil {
return defaultRate
}
var statsTbl *statistics.Table
if tid == tblInfo.ID {
statsTbl = statsHandle.GetTableStats(tblInfo)
} else {
statsTbl = statsHandle.GetPartitionStats(tblInfo, tid)
}
if statsTbl == nil {
return defaultRate
}
// If the count in stats_meta is still 0, the table is not large, we scan all rows.
if statsTbl.Count == 0 {
return 1
}
// We are expected to scan about 100000 rows or so.
return math.Min(1, 110000/float64(statsTbl.Count))
}

func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string, schemaForVirtualColEval *expression.Schema) *analyzeTask {
if task.StatsVersion == statistics.Version2 {
return b.buildAnalyzeSamplingPushdown(task, opts, autoAnalyze, schemaForVirtualColEval)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ require (
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible
github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4
github.com/pingcap/tipb v0.0.0-20211026080602-ec68283c1735
github.com/prometheus/client_golang v1.5.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.9.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,8 @@ github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:O
github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible h1:c7+izmker91NkjkZ6FgTlmD4k1A5FLOAq+li6Ki2/GY=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4 h1:9Ef4j3DLmUidURfob0tf94v+sqvozqdCTr7e5hi19qU=
github.com/pingcap/tipb v0.0.0-20211025074540-e1c7362eeeb4/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pingcap/tipb v0.0.0-20211026080602-ec68283c1735 h1:kS8pJNUnF3ENkjtBcJeMe/W8+9RtrChcortoyljCwwc=
github.com/pingcap/tipb v0.0.0-20211026080602-ec68283c1735/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
Loading