Skip to content

Commit

Permalink
use rate by default
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Oct 25, 2021
1 parent 9e13b69 commit ddbfd2c
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 35 deletions.
62 changes: 31 additions & 31 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
// decodeSampleDataWithVirtualColumn constructs the virtual column by evaluating from the deocded normal columns.
// If it failed, it would return false to trigger normal decoding way without the virtual column.
func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
collector *statistics.ReservoirRowSampleCollector,
collector statistics.RowSampleCollector,
fieldTps []*types.FieldType,
virtualColIdx []int,
schema *expression.Schema,
Expand All @@ -779,9 +779,9 @@ func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
for _, col := range e.schemaForVirtualColEval.Columns {
totFts = append(totFts, col.RetType)
}
chk := chunk.NewChunkWithCapacity(totFts, len(collector.Samples))
chk := chunk.NewChunkWithCapacity(totFts, len(collector.Base().Samples))
decoder := codec.NewDecoder(chk, e.ctx.GetSessionVars().Location())
for _, sample := range collector.Samples {
for _, sample := range collector.Base().Samples {
for i := range sample.Columns {
if schema.Columns[i].VirtualExpr != nil {
continue
Expand All @@ -799,7 +799,7 @@ func (e AnalyzeColumnsExec) decodeSampleDataWithVirtualColumn(
iter := chunk.NewIterator4Chunk(chk)
for row, i := iter.Begin(), 0; row != iter.End(); row, i = iter.Next(), i+1 {
datums := row.GetDatumRow(totFts)
collector.Samples[i].Columns = datums
collector.Base().Samples[i].Columns = datums
}
return nil
}
Expand Down Expand Up @@ -842,9 +842,9 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}()

l := len(e.analyzePB.ColReq.ColumnsInfo) + len(e.analyzePB.ColReq.ColumnGroups)
rootRowCollector := statistics.NewReservoirRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), l)
rootRowCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), *e.analyzePB.ColReq.SampleRate, l)
for i := 0; i < l; i++ {
rootRowCollector.FMSketches = append(rootRowCollector.FMSketches, statistics.NewFMSketch(maxSketchSize))
rootRowCollector.Base().FMSketches = append(rootRowCollector.Base().FMSketches, statistics.NewFMSketch(maxSketchSize))
}
sc := e.ctx.GetSessionVars().StmtCtx
statsConcurrency, err := getBuildStatsConcurrency(e.ctx)
Expand Down Expand Up @@ -894,7 +894,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}
} else {
// If there's no virtual column or we meet error during eval virtual column, we fallback to normal decode otherwise.
for _, sample := range rootRowCollector.Samples {
for _, sample := range rootRowCollector.Base().Samples {
for i := range sample.Columns {
sample.Columns[i], err = tablecodec.DecodeColumnValue(sample.Columns[i].GetBytes(), &e.colsInfo[i].FieldType, sc.TimeZone)
if err != nil {
Expand All @@ -904,7 +904,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}
}

for _, sample := range rootRowCollector.Samples {
for _, sample := range rootRowCollector.Base().Samples {
// Calculate handle from the row data for each row. It will be used to sort the samples.
sample.Handle, err = e.handleCols.BuildHandleByDatums(sample.Columns)
if err != nil {
Expand All @@ -916,8 +916,8 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(

// The order of the samples are broken when merging samples from sub-collectors.
// So now we need to sort the samples according to the handle in order to calculate correlation.
sort.Slice(rootRowCollector.Samples, func(i, j int) bool {
return rootRowCollector.Samples[i].Handle.Compare(rootRowCollector.Samples[j].Handle) < 0
sort.Slice(rootRowCollector.Base().Samples, func(i, j int) bool {
return rootRowCollector.Base().Samples[i].Handle.Compare(rootRowCollector.Base().Samples[j].Handle) < 0
})

totalLen := len(e.colsInfo) + len(e.indexes)
Expand All @@ -941,7 +941,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
isColumn: true,
slicePos: i,
}
fmSketches = append(fmSketches, rootRowCollector.FMSketches[i])
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[i])
}

indexPushedDownResult := <-idxNDVPushDownCh
Expand All @@ -950,8 +950,8 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
}
for _, offset := range indexesWithVirtualColOffsets {
ret := indexPushedDownResult.results[e.indexes[offset].ID]
rootRowCollector.NullCount[colLen+offset] = ret.Count
rootRowCollector.FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
rootRowCollector.Base().NullCount[colLen+offset] = ret.Count
rootRowCollector.Base().FMSketches[colLen+offset] = ret.Ars[0].Fms[0]
}

// build index stats
Expand All @@ -963,7 +963,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
isColumn: false,
slicePos: colLen + i,
}
fmSketches = append(fmSketches, rootRowCollector.FMSketches[colLen+i])
fmSketches = append(fmSketches, rootRowCollector.Base().FMSketches[colLen+i])
}
close(buildTaskChan)
panicCnt := 0
Expand All @@ -983,7 +983,7 @@ func (e *AnalyzeColumnsExec) buildSamplingStats(
if err != nil {
return 0, nil, nil, nil, nil, err
}
count = rootRowCollector.Count
count = rootRowCollector.Base().Count
if needExtStats {
statsHandle := domain.GetDomain(e.ctx).StatsHandle()
extStats, err = statsHandle.BuildExtendedStats(e.TableID.GetStatisticsID(), e.colsInfo, sampleCollectors)
Expand Down Expand Up @@ -1160,7 +1160,7 @@ func (e *AnalyzeColumnsExec) buildSubIndexJobForSpecialIndex(indexInfos []*model
}

type samplingMergeResult struct {
collector *statistics.ReservoirRowSampleCollector
collector statistics.RowSampleCollector
err error
}

Expand Down Expand Up @@ -1190,9 +1190,9 @@ func (e *AnalyzeColumnsExec) subMergeWorker(resultCh chan<- *samplingMergeResult
failpoint.Inject("mockAnalyzeSamplingMergeWorkerPanic", func() {
panic("failpoint triggered")
})
retCollector := statistics.NewReservoirRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), l)
retCollector := statistics.NewRowSampleCollector(int(e.analyzePB.ColReq.SampleSize), *e.analyzePB.ColReq.SampleRate, l)
for i := 0; i < l; i++ {
retCollector.FMSketches = append(retCollector.FMSketches, statistics.NewFMSketch(maxSketchSize))
retCollector.Base().FMSketches = append(retCollector.Base().FMSketches, statistics.NewFMSketch(maxSketchSize))
}
for {
data, ok := <-taskCh
Expand All @@ -1217,7 +1217,7 @@ func (e *AnalyzeColumnsExec) subMergeWorker(resultCh chan<- *samplingMergeResult

type samplingBuildTask struct {
id int64
rootRowCollector *statistics.ReservoirRowSampleCollector
rootRowCollector statistics.RowSampleCollector
tp *types.FieldType
isColumn bool
slicePos int
Expand Down Expand Up @@ -1256,8 +1256,8 @@ workLoop:
topns[task.slicePos] = nil
continue
}
sampleItems := make([]*statistics.SampleItem, 0, task.rootRowCollector.MaxSampleSize)
for j, row := range task.rootRowCollector.Samples {
sampleItems := make([]*statistics.SampleItem, 0, task.rootRowCollector.Base().Samples.Len())
for j, row := range task.rootRowCollector.Base().Samples {
if row.Columns[task.slicePos].IsNull() {
continue
}
Expand All @@ -1276,17 +1276,17 @@ workLoop:
}
collector = &statistics.SampleCollector{
Samples: sampleItems,
NullCount: task.rootRowCollector.NullCount[task.slicePos],
Count: task.rootRowCollector.Count - task.rootRowCollector.NullCount[task.slicePos],
FMSketch: task.rootRowCollector.FMSketches[task.slicePos],
TotalSize: task.rootRowCollector.TotalSizes[task.slicePos],
NullCount: task.rootRowCollector.Base().NullCount[task.slicePos],
Count: task.rootRowCollector.Base().Count - task.rootRowCollector.Base().NullCount[task.slicePos],
FMSketch: task.rootRowCollector.Base().FMSketches[task.slicePos],
TotalSize: task.rootRowCollector.Base().TotalSizes[task.slicePos],
}
} else {
var tmpDatum types.Datum
var err error
idx := e.indexes[task.slicePos-colLen]
sampleItems := make([]*statistics.SampleItem, 0, task.rootRowCollector.MaxSampleSize)
for _, row := range task.rootRowCollector.Samples {
sampleItems := make([]*statistics.SampleItem, 0, task.rootRowCollector.Base().Samples.Len())
for _, row := range task.rootRowCollector.Base().Samples {
if len(idx.Columns) == 1 && row.Columns[idx.Columns[0].Offset].IsNull() {
continue
}
Expand Down Expand Up @@ -1315,10 +1315,10 @@ workLoop:
}
collector = &statistics.SampleCollector{
Samples: sampleItems,
NullCount: task.rootRowCollector.NullCount[task.slicePos],
Count: task.rootRowCollector.Count - task.rootRowCollector.NullCount[task.slicePos],
FMSketch: task.rootRowCollector.FMSketches[task.slicePos],
TotalSize: task.rootRowCollector.TotalSizes[task.slicePos],
NullCount: task.rootRowCollector.Base().NullCount[task.slicePos],
Count: task.rootRowCollector.Base().Count - task.rootRowCollector.Base().NullCount[task.slicePos],
FMSketch: task.rootRowCollector.Base().FMSketches[task.slicePos],
TotalSize: task.rootRowCollector.Base().TotalSizes[task.slicePos],
}
}
if task.isColumn {
Expand Down
30 changes: 30 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor
import (
"bytes"
"context"
"math"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -2216,9 +2217,15 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeC
baseCount: count,
baseModifyCnt: modifyCount,
}
sampleRate := new(float64)
*sampleRate = math.Float64frombits(opts[ast.AnalyzeOptSampleRate])
if *sampleRate < 0 {
*sampleRate = b.getAdjustedSampleRate(b.ctx, task.TableID.GetStatisticsID(), task.TblInfo)
}
e.analyzePB.ColReq = &tipb.AnalyzeColumnsReq{
BucketSize: int64(opts[ast.AnalyzeOptNumBuckets]),
SampleSize: int64(opts[ast.AnalyzeOptNumSamples]),
SampleRate: sampleRate,
SketchSize: maxSketchSize,
ColumnsInfo: util.ColumnsToProto(task.ColsInfo, task.TblInfo.PKIsHandle),
ColumnGroups: colGroups,
Expand All @@ -2233,6 +2240,29 @@ func (b *executorBuilder) buildAnalyzeSamplingPushdown(task plannercore.AnalyzeC
return &analyzeTask{taskType: colTask, colExec: e, job: job}
}

func (b *executorBuilder) getAdjustedSampleRate(sctx sessionctx.Context, tid int64, tblInfo *model.TableInfo) float64 {
statsHandle := domain.GetDomain(sctx).StatsHandle()
defaultRate := 0.001
if statsHandle == nil {
return defaultRate
}
var statsTbl *statistics.Table
if tid == tblInfo.ID {
statsTbl = statsHandle.GetTableStats(tblInfo)
} else {
statsTbl = statsHandle.GetPartitionStats(tblInfo, tid)
}
if statsTbl == nil {
return defaultRate
}
// If the count in stats_meta is still 0, the table is not large, we scan all rows.
if statsTbl.Count == 0 {
return 1
}
// We are expected to scan about 100000 rows.
return math.Min(1, 100000/float64(statsTbl.Count))
}

func (b *executorBuilder) buildAnalyzeColumnsPushdown(task plannercore.AnalyzeColumnsTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string, schemaForVirtualColEval *expression.Schema) *analyzeTask {
if task.StatsVersion == statistics.Version2 {
return b.buildAnalyzeSamplingPushdown(task, opts, autoAnalyze, schemaForVirtualColEval)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,5 @@ replace github.com/pingcap/tidb/parser => ./parser

// fix potential security issue(CVE-2020-26160) introduced by indirect dependency.
replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible

replace github.com/pingcap/tipb => github.com/winoros/tipb v0.0.0-20211025094955-759361265b18
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,6 @@ github.com/pingcap/tidb-dashboard v0.0.0-20210312062513-eef5d6404638/go.mod h1:O
github.com/pingcap/tidb-dashboard v0.0.0-20210716172320-2226872e3296/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-tools v5.0.3+incompatible h1:vYMrW9ux+3HRMeRZ1fUOjy2nyiodtuVyAyK270EKBEs=
github.com/pingcap/tidb-tools v5.0.3+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20211008080435-3fd327dfce0e h1:fZY5T65QWiPc9noQJ1UkdwejZyBZjNfxzSyTcBjKrEU=
github.com/pingcap/tipb v0.0.0-20211008080435-3fd327dfce0e/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -774,6 +772,8 @@ github.com/vmihailenco/msgpack/v5 v5.0.0-beta.1/go.mod h1:xlngVLeyQ/Qi05oQxhQ+oT
github.com/vmihailenco/tagparser v0.1.1/go.mod h1:OeAg3pn3UbLjkWt+rN9oFYB6u/cQgqMEUPoW2WPyhdI=
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f h1:9DDCDwOyEy/gId+IEMrFHLuQ5R/WV0KNxWLler8X2OY=
github.com/wangjohn/quickselect v0.0.0-20161129230411-ed8402a42d5f/go.mod h1:8sdOQnirw1PrcnTJYkmW1iOHtUmblMmGdUOHyWYycLI=
github.com/winoros/tipb v0.0.0-20211025094955-759361265b18 h1:4Bpj20cgvdxLdIRK0zuxHlAgR8FRa8CNwopcaPW2mZ4=
github.com/winoros/tipb v0.0.0-20211025094955-759361265b18/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
Expand Down
4 changes: 2 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2067,8 +2067,8 @@ var analyzeOptionDefaultV2 = map[ast.AnalyzeOptionType]uint64{
ast.AnalyzeOptNumTopN: 500,
ast.AnalyzeOptCMSketchWidth: 2048,
ast.AnalyzeOptCMSketchDepth: 5,
ast.AnalyzeOptNumSamples: 100000,
ast.AnalyzeOptSampleRate: math.Float64bits(0),
ast.AnalyzeOptNumSamples: 0,
ast.AnalyzeOptSampleRate: math.Float64bits(-1),
}

func handleAnalyzeOptions(opts []ast.AnalyzeOpt, statsVer int) (map[ast.AnalyzeOptionType]uint64, error) {
Expand Down
1 change: 1 addition & 0 deletions store/mockstore/unistore/cophandler/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func handleAnalyzeFullSamplingReq(
ColGroups: colGroups,
MaxSampleSize: int(colReq.SampleSize),
MaxFMSketchSize: int(colReq.SketchSize),
SampleRate: colReq.GetSampleRate(),
Rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
collector, err := builder.Collect()
Expand Down

0 comments on commit ddbfd2c

Please sign in to comment.