Skip to content

Commit 2b088ac

Browse files
hawkingreiti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#45998
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
1 parent fdc19bb commit 2b088ac

File tree

4 files changed

+261
-8
lines changed

4 files changed

+261
-8
lines changed

statistics/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ go_test(
7171
name = "statistics_test",
7272
timeout = "short",
7373
srcs = [
74+
"cmsketch_bench_test.go",
7475
"cmsketch_test.go",
7576
"feedback_test.go",
7677
"fmsketch_test.go",

statistics/cmsketch_bench_test.go

+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright 2023 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package statistics_test
16+
17+
import (
18+
"fmt"
19+
"testing"
20+
"time"
21+
22+
"github.com/pingcap/tidb/parser/mysql"
23+
"github.com/pingcap/tidb/sessionctx/stmtctx"
24+
"github.com/pingcap/tidb/statistics"
25+
"github.com/pingcap/tidb/statistics/handle"
26+
"github.com/pingcap/tidb/types"
27+
"github.com/pingcap/tidb/util/chunk"
28+
"github.com/pingcap/tidb/util/codec"
29+
"github.com/stretchr/testify/require"
30+
)
31+
32+
// cmd: go test -run=^$ -bench=BenchmarkMergePartTopN2GlobalTopNWithHists -benchmem github.com/pingcap/tidb/statistics
33+
func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
34+
loc := time.UTC
35+
sc := &stmtctx.StatementContext{TimeZone: loc}
36+
version := 1
37+
isKilled := uint32(0)
38+
39+
// Prepare TopNs.
40+
topNs := make([]*statistics.TopN, 0, partitions)
41+
for i := 0; i < partitions; i++ {
42+
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
43+
topN := statistics.NewTopN(3)
44+
{
45+
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
46+
require.NoError(b, err)
47+
topN.AppendTopN(key1, 2)
48+
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
49+
require.NoError(b, err)
50+
topN.AppendTopN(key2, 2)
51+
if i%2 == 0 {
52+
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
53+
require.NoError(b, err)
54+
topN.AppendTopN(key3, 3)
55+
}
56+
}
57+
topNs = append(topNs, topN)
58+
}
59+
60+
// Prepare Hists.
61+
hists := make([]*statistics.Histogram, 0, partitions)
62+
for i := 0; i < partitions; i++ {
63+
// Construct Hist
64+
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
65+
h.Bounds.AppendInt64(0, 1)
66+
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
67+
h.Bounds.AppendInt64(0, 2)
68+
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
69+
h.Bounds.AppendInt64(0, 3)
70+
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
71+
h.Bounds.AppendInt64(0, 4)
72+
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
73+
hists = append(hists, h)
74+
}
75+
76+
b.ResetTimer()
77+
for i := 0; i < b.N; i++ {
78+
// Benchmark merge 10 topN.
79+
_, _, _, _ = statistics.MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled)
80+
}
81+
}
82+
83+
// cmd: go test -run=^$ -bench=BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists -benchmem github.com/pingcap/tidb/statistics
84+
func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *testing.B) {
85+
loc := time.UTC
86+
sc := &stmtctx.StatementContext{TimeZone: loc}
87+
version := 1
88+
isKilled := uint32(0)
89+
90+
// Prepare TopNs.
91+
topNs := make([]*statistics.TopN, 0, partitions)
92+
for i := 0; i < partitions; i++ {
93+
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
94+
topN := statistics.NewTopN(3)
95+
{
96+
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
97+
require.NoError(b, err)
98+
topN.AppendTopN(key1, 2)
99+
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
100+
require.NoError(b, err)
101+
topN.AppendTopN(key2, 2)
102+
if i%2 == 0 {
103+
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
104+
require.NoError(b, err)
105+
topN.AppendTopN(key3, 3)
106+
}
107+
}
108+
topNs = append(topNs, topN)
109+
}
110+
111+
// Prepare Hists.
112+
hists := make([]*statistics.Histogram, 0, partitions)
113+
for i := 0; i < partitions; i++ {
114+
// Construct Hist
115+
h := statistics.NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
116+
h.Bounds.AppendInt64(0, 1)
117+
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 20})
118+
h.Bounds.AppendInt64(0, 2)
119+
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
120+
h.Bounds.AppendInt64(0, 3)
121+
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 30})
122+
h.Bounds.AppendInt64(0, 4)
123+
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
124+
hists = append(hists, h)
125+
}
126+
wrapper := &statistics.StatsWrapper{
127+
AllTopN: topNs,
128+
AllHg: hists,
129+
}
130+
const mergeConcurrency = 4
131+
batchSize := len(wrapper.AllTopN) / mergeConcurrency
132+
if batchSize < 1 {
133+
batchSize = 1
134+
} else if batchSize > handle.MaxPartitionMergeBatchSize {
135+
batchSize = handle.MaxPartitionMergeBatchSize
136+
}
137+
b.ResetTimer()
138+
for i := 0; i < b.N; i++ {
139+
// Benchmark merge 10 topN.
140+
_, _, _, _ = handle.MergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
141+
}
142+
}
143+
144+
var benchmarkSizes = []int{100, 1000, 10000, 100000, 1000000, 10000000}
145+
var benchmarkConcurrencySizes = []int{100, 1000, 10000, 100000, 1000000, 10000000, 100000000}
146+
147+
func BenchmarkMergePartTopN2GlobalTopNWithHists(b *testing.B) {
148+
for _, size := range benchmarkSizes {
149+
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
150+
benchmarkMergePartTopN2GlobalTopNWithHists(size, b)
151+
})
152+
}
153+
}
154+
155+
func BenchmarkMergeGlobalStatsTopNByConcurrencyWithHists(b *testing.B) {
156+
for _, size := range benchmarkConcurrencySizes {
157+
b.Run(fmt.Sprintf("Size%d", size), func(b *testing.B) {
158+
benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(size, b)
159+
})
160+
}
161+
}

statistics/cmsketch_test.go

+87
Original file line numberDiff line numberDiff line change
@@ -303,3 +303,90 @@ func TestCMSketchCodingTopN(t *testing.T) {
303303
_, _, err = DecodeCMSketchAndTopN([]byte{}, rows)
304304
require.NoError(t, err)
305305
}
306+
<<<<<<< HEAD
307+
=======
308+
309+
func TestMergePartTopN2GlobalTopNWithoutHists(t *testing.T) {
310+
loc := time.UTC
311+
sc := &stmtctx.StatementContext{TimeZone: loc}
312+
version := 1
313+
isKilled := uint32(0)
314+
315+
// Prepare TopNs.
316+
topNs := make([]*TopN, 0, 10)
317+
for i := 0; i < 10; i++ {
318+
// Construct TopN, should be key(1, 1) -> 2, key(1, 2) -> 2, key(1, 3) -> 3.
319+
topN := NewTopN(3)
320+
{
321+
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(1))
322+
require.NoError(t, err)
323+
topN.AppendTopN(key1, 2)
324+
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(2))
325+
require.NoError(t, err)
326+
topN.AppendTopN(key2, 2)
327+
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(3))
328+
require.NoError(t, err)
329+
topN.AppendTopN(key3, 3)
330+
}
331+
topNs = append(topNs, topN)
332+
}
333+
334+
// Test merge 2 topN with nil hists.
335+
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, nil, false, &isKilled)
336+
require.NoError(t, err)
337+
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
338+
require.Equal(t, uint64(50), globalTopN.TotalCount(), "should have 50 rows")
339+
require.Len(t, leftTopN, 1, "should have 1 left topN")
340+
}
341+
342+
func TestMergePartTopN2GlobalTopNWithHists(t *testing.T) {
343+
loc := time.UTC
344+
sc := &stmtctx.StatementContext{TimeZone: loc}
345+
version := 1
346+
isKilled := uint32(0)
347+
348+
// Prepare TopNs.
349+
topNs := make([]*TopN, 0, 10)
350+
for i := 0; i < 10; i++ {
351+
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
352+
topN := NewTopN(3)
353+
{
354+
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
355+
require.NoError(t, err)
356+
topN.AppendTopN(key1, 2)
357+
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
358+
require.NoError(t, err)
359+
topN.AppendTopN(key2, 2)
360+
if i%2 == 0 {
361+
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
362+
require.NoError(t, err)
363+
topN.AppendTopN(key3, 3)
364+
}
365+
}
366+
topNs = append(topNs, topN)
367+
}
368+
369+
// Prepare Hists.
370+
hists := make([]*Histogram, 0, 10)
371+
for i := 0; i < 10; i++ {
372+
// Construct Hist
373+
h := NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
374+
h.Bounds.AppendInt64(0, 1)
375+
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 20})
376+
h.Bounds.AppendInt64(0, 2)
377+
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
378+
h.Bounds.AppendInt64(0, 3)
379+
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
380+
h.Bounds.AppendInt64(0, 4)
381+
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 40})
382+
hists = append(hists, h)
383+
}
384+
385+
// Test merge 2 topN.
386+
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, hists, false, &isKilled)
387+
require.NoError(t, err)
388+
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
389+
require.Equal(t, uint64(55), globalTopN.TotalCount(), "should have 55")
390+
require.Len(t, leftTopN, 1, "should have 1 left topN")
391+
}
392+
>>>>>>> 9d517f6b83e (statistics: add bench for MergeGlobalStatsTopNByConcurrency (#45998))

statistics/handle/handle.go

+12-8
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ const (
5858
// TiDBGlobalStats represents the global-stats for a partitioned table.
5959
TiDBGlobalStats = "global"
6060

61-
// maxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
62-
maxPartitionMergeBatchSize = 256
61+
// MaxPartitionMergeBatchSize indicates the max batch size for a worker to merge partition stats
62+
MaxPartitionMergeBatchSize = 256
6363
)
6464

6565
// Handle can update stats info periodically.
@@ -804,7 +804,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
804804
// These remaining topN numbers will be used as a separate bucket for later histogram merging.
805805
var popedTopN []statistics.TopNMeta
806806
wrapper := statistics.NewStatsWrapper(allHg[i], allTopN[i])
807-
globalStats.TopN[i], popedTopN, allHg[i], err = h.mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
807+
globalStats.TopN[i], popedTopN, allHg[i], err = mergeGlobalStatsTopN(sc, wrapper, sc.GetSessionVars().StmtCtx.TimeZone, sc.GetSessionVars().AnalyzeVersion, uint32(opts[ast.AnalyzeOptNumTopN]), isIndex == 1)
808808
if err != nil {
809809
return
810810
}
@@ -836,7 +836,7 @@ func (h *Handle) mergePartitionStats2GlobalStats(sc sessionctx.Context,
836836
return
837837
}
838838

839-
func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper,
839+
func mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics.StatsWrapper,
840840
timeZone *time.Location, version int, n uint32, isIndex bool) (*statistics.TopN,
841841
[]statistics.TopNMeta, []*statistics.Histogram, error) {
842842
mergeConcurrency := sc.GetSessionVars().AnalyzePartitionMergeConcurrency
@@ -848,17 +848,21 @@ func (h *Handle) mergeGlobalStatsTopN(sc sessionctx.Context, wrapper *statistics
848848
batchSize := len(wrapper.AllTopN) / mergeConcurrency
849849
if batchSize < 1 {
850850
batchSize = 1
851-
} else if batchSize > maxPartitionMergeBatchSize {
852-
batchSize = maxPartitionMergeBatchSize
851+
} else if batchSize > MaxPartitionMergeBatchSize {
852+
batchSize = MaxPartitionMergeBatchSize
853853
}
854-
return h.mergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killed)
854+
return MergeGlobalStatsTopNByConcurrency(mergeConcurrency, batchSize, wrapper, timeZone, version, n, isIndex, killed)
855855
}
856856

857-
// mergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
857+
// MergeGlobalStatsTopNByConcurrency merge partition topN by concurrency
858858
// To merge global stats topn by concurrency, we will separate the partition topn in concurrency part and deal it with different worker.
859859
// mergeConcurrency is used to control the total concurrency of the running worker, and mergeBatchSize is sued to control
860860
// the partition size for each worker to solve it
861+
<<<<<<< HEAD
861862
func (h *Handle) mergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
863+
=======
864+
func MergeGlobalStatsTopNByConcurrency(mergeConcurrency, mergeBatchSize int, wrapper *statistics.StatsWrapper,
865+
>>>>>>> 9d517f6b83e (statistics: add bench for MergeGlobalStatsTopNByConcurrency (#45998))
862866
timeZone *time.Location, version int, n uint32, isIndex bool, killed *uint32) (*statistics.TopN,
863867
[]statistics.TopNMeta, []*statistics.Histogram, error) {
864868
if len(wrapper.AllTopN) < mergeConcurrency {

0 commit comments

Comments
 (0)