Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: move MergePartTopN2GlobalTopN into handle/globalstats #47901

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ go_test(
data = glob(["testdata/**"]),
embed = [":statistics"],
flaky = True,
shard_count = 34,
shard_count = 32,
deps = [
"//pkg/config",
"//pkg/parser/ast",
Expand Down
80 changes: 0 additions & 80 deletions pkg/statistics/cmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"slices"
"sort"
"strings"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -795,84 +793,6 @@ func NewTopN(n int) *TopN {
return &TopN{TopN: make([]TopNMeta, 0, n)}
}

// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
// The input parameters:
// 1. `topNs` are the partition-level topNs to be merged.
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
//
// The output parameters:
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*TopN, n uint32, hists []*Histogram,
isIndex bool, killed *uint32) (*TopN, []TopNMeta, []*Histogram, error) {
if CheckEmptyTopNs(topNs) {
return nil, nil, hists, nil
}
partNum := len(topNs)
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]float64)
// datumMap is used to store the mapping from the string type to datum type.
// The datum is used to find the value in the histogram.
datumMap := NewDatumMapCache()
for i, topN := range topNs {
if atomic.LoadUint32(killed) == 1 {
return nil, nil, nil, errors.Trace(ErrQueryInterrupted)
}
if topN.TotalCount() == 0 {
continue
}
for _, val := range topN.TopN {
encodedVal := hack.String(val.Encoded)
_, exists := counter[encodedVal]
counter[encodedVal] += float64(val.Count)
if exists {
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
continue
}
// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
// 1. Check the topN first.
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
for j := 0; j < partNum; j++ {
if atomic.LoadUint32(killed) == 1 {
return nil, nil, nil, errors.Trace(ErrQueryInterrupted)
}
if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
continue
}
// Get the encodedVal from the hists[j]
datum, exists := datumMap.Get(encodedVal)
if !exists {
d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc)
if err != nil {
return nil, nil, nil, err
}
datum = d
}
// Get the row count which the value is equal to the encodedVal from histogram.
count, _ := hists[j].EqualRowCount(nil, datum, isIndex)
if count != 0 {
counter[encodedVal] += count
// Remove the value corresponding to encodedVal from the histogram.
hists[j].BinarySearchRemoveVal(TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
}
}
}
}
numTop := len(counter)
if numTop == 0 {
return nil, nil, hists, nil
}
sorted := make([]TopNMeta, 0, numTop)
for value, cnt := range counter {
data := hack.Slice(string(value))
sorted = append(sorted, TopNMeta{Encoded: data, Count: uint64(cnt)})
}
globalTopN, leftTopN := GetMergedTopNFromSortedSlice(sorted, n)
return globalTopN, leftTopN, hists, nil
}

// MergeTopN is used to merge more TopN structures to generate a new TopN struct by the given size.
// The input parameters are multiple TopN structures to be merged and the size of the new TopN that will be generated.
// The output parameters are the newly generated TopN structure and the remaining numbers.
Expand Down
86 changes: 0 additions & 86 deletions pkg/statistics/cmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"math"
"math/rand"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -256,39 +254,6 @@ func TestCMSketchCodingTopN(t *testing.T) {
require.NoError(t, err)
}

func TestMergePartTopN2GlobalTopNWithoutHists(t *testing.T) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
isKilled := uint32(0)

// Prepare TopNs.
topNs := make([]*TopN, 0, 10)
for i := 0; i < 10; i++ {
// Construct TopN, should be key(1, 1) -> 2, key(1, 2) -> 2, key(1, 3) -> 3.
topN := NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(1))
require.NoError(t, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(2))
require.NoError(t, err)
topN.AppendTopN(key2, 2)
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1), types.NewIntDatum(3))
require.NoError(t, err)
topN.AppendTopN(key3, 3)
}
topNs = append(topNs, topN)
}

// Test merge 2 topN with nil hists.
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, nil, false, &isKilled)
require.NoError(t, err)
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
require.Equal(t, uint64(50), globalTopN.TotalCount(), "should have 50 rows")
require.Len(t, leftTopN, 1, "should have 1 left topN")
}

func TestSortTopnMeta(t *testing.T) {
data := []TopNMeta{{
Encoded: []byte("a"),
Expand All @@ -300,54 +265,3 @@ func TestSortTopnMeta(t *testing.T) {
SortTopnMeta(data)
require.Equal(t, uint64(2), data[0].Count)
}

func TestMergePartTopN2GlobalTopNWithHists(t *testing.T) {
loc := time.UTC
sc := stmtctx.NewStmtCtxWithTimeZone(loc)
version := 1
isKilled := uint32(0)

// Prepare TopNs.
topNs := make([]*TopN, 0, 10)
for i := 0; i < 10; i++ {
// Construct TopN, should be key1 -> 2, key2 -> 2, key3 -> 3.
topN := NewTopN(3)
{
key1, err := codec.EncodeKey(sc, nil, types.NewIntDatum(1))
require.NoError(t, err)
topN.AppendTopN(key1, 2)
key2, err := codec.EncodeKey(sc, nil, types.NewIntDatum(2))
require.NoError(t, err)
topN.AppendTopN(key2, 2)
if i%2 == 0 {
key3, err := codec.EncodeKey(sc, nil, types.NewIntDatum(3))
require.NoError(t, err)
topN.AppendTopN(key3, 3)
}
}
topNs = append(topNs, topN)
}

// Prepare Hists.
hists := make([]*Histogram, 0, 10)
for i := 0; i < 10; i++ {
// Construct Hist
h := NewHistogram(1, 10, 0, 0, types.NewFieldType(mysql.TypeTiny), chunk.InitialCapacity, 0)
h.Bounds.AppendInt64(0, 1)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 20})
h.Bounds.AppendInt64(0, 2)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 3)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 30})
h.Bounds.AppendInt64(0, 4)
h.Buckets = append(h.Buckets, Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}

// Test merge 2 topN.
globalTopN, leftTopN, _, err := MergePartTopN2GlobalTopN(loc, version, topNs, 2, hists, false, &isKilled)
require.NoError(t, err)
require.Len(t, globalTopN.TopN, 2, "should only have 2 topN")
require.Equal(t, uint64(55), globalTopN.TotalCount(), "should have 55")
require.Len(t, leftTopN, 1, "should have 1 left topN")
}
5 changes: 3 additions & 2 deletions pkg/statistics/handle/globalstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ go_test(
"globalstats_test.go",
"main_test.go",
"topn_bench_test.go",
"topn_test.go",
],
embed = [":globalstats"],
flaky = True,
shard_count = 19,
shard_count = 21,
deps = [
":globalstats",
"//pkg/config",
"//pkg/parser/model",
"//pkg/parser/mysql",
Expand Down
82 changes: 81 additions & 1 deletion pkg/statistics/handle/globalstats/topn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ package globalstats
import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/tiancaiamao/gp"
)

Expand All @@ -32,7 +34,7 @@ func mergeGlobalStatsTopN(gp *gp.Pool, sc sessionctx.Context, wrapper *StatsWrap
killed := &sc.GetSessionVars().Killed
// use original method if concurrency equals 1 or for version1
if mergeConcurrency < 2 {
return statistics.MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed)
return MergePartTopN2GlobalTopN(timeZone, version, wrapper.AllTopN, n, wrapper.AllHg, isIndex, killed)
}
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
Expand Down Expand Up @@ -113,3 +115,81 @@ func MergeGlobalStatsTopNByConcurrency(gp *gp.Pool, mergeConcurrency, mergeBatch
statistics.SortTopnMeta(result)
return globalTopN, result, wrapper.AllHg, nil
}

// MergePartTopN2GlobalTopN is used to merge the partition-level topN to global-level topN.
// The input parameters:
// 1. `topNs` are the partition-level topNs to be merged.
// 2. `n` is the size of the global-level topN. Notice: This value can be 0 and has no default value, we must explicitly specify this value.
// 3. `hists` are the partition-level histograms. Some values not in topN may be placed in the histogram. We need it here to make the value in the global-level TopN more accurate.
//
// The output parameters:
// 1. `*TopN` is the final global-level topN.
// 2. `[]TopNMeta` is the left topN value from the partition-level TopNs, but is not placed to global-level TopN. We should put them back to histogram latter.
// 3. `[]*Histogram` are the partition-level histograms which just delete some values when we merge the global-level topN.
func MergePartTopN2GlobalTopN(loc *time.Location, version int, topNs []*statistics.TopN, n uint32, hists []*statistics.Histogram,
isIndex bool, killed *uint32) (*statistics.TopN, []statistics.TopNMeta, []*statistics.Histogram, error) {
if statistics.CheckEmptyTopNs(topNs) {
return nil, nil, hists, nil
}
partNum := len(topNs)
// Different TopN structures may hold the same value, we have to merge them.
counter := make(map[hack.MutableString]float64)
// datumMap is used to store the mapping from the string type to datum type.
// The datum is used to find the value in the histogram.
datumMap := statistics.NewDatumMapCache()
for i, topN := range topNs {
if atomic.LoadUint32(killed) == 1 {
return nil, nil, nil, errors.Trace(statistics.ErrQueryInterrupted)
}
if topN.TotalCount() == 0 {
continue
}
for _, val := range topN.TopN {
encodedVal := hack.String(val.Encoded)
_, exists := counter[encodedVal]
counter[encodedVal] += float64(val.Count)
if exists {
// We have already calculated the encodedVal from the histogram, so just continue to next topN value.
continue
}
// We need to check whether the value corresponding to encodedVal is contained in other partition-level stats.
// 1. Check the topN first.
// 2. If the topN doesn't contain the value corresponding to encodedVal. We should check the histogram.
for j := 0; j < partNum; j++ {
if atomic.LoadUint32(killed) == 1 {
return nil, nil, nil, errors.Trace(statistics.ErrQueryInterrupted)
}
if (j == i && version >= 2) || topNs[j].FindTopN(val.Encoded) != -1 {
continue
}
// Get the encodedVal from the hists[j]
datum, exists := datumMap.Get(encodedVal)
if !exists {
d, err := datumMap.Put(val, encodedVal, hists[0].Tp.GetType(), isIndex, loc)
if err != nil {
return nil, nil, nil, err
}
datum = d
}
// Get the row count which the value is equal to the encodedVal from histogram.
count, _ := hists[j].EqualRowCount(nil, datum, isIndex)
if count != 0 {
counter[encodedVal] += count
// Remove the value corresponding to encodedVal from the histogram.
hists[j].BinarySearchRemoveVal(statistics.TopNMeta{Encoded: datum.GetBytes(), Count: uint64(count)})
}
}
}
}
numTop := len(counter)
if numTop == 0 {
return nil, nil, hists, nil
}
sorted := make([]statistics.TopNMeta, 0, numTop)
for value, cnt := range counter {
data := hack.Slice(string(value))
sorted = append(sorted, statistics.TopNMeta{Encoded: data, Count: uint64(cnt)})
}
globalTopN, leftTopN := statistics.GetMergedTopNFromSortedSlice(sorted, n)
return globalTopN, leftTopN, hists, nil
}
13 changes: 6 additions & 7 deletions pkg/statistics/handle/globalstats/topn_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package globalstats_test
package globalstats

import (
"fmt"
Expand All @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/globalstats"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/codec"
Expand Down Expand Up @@ -77,7 +76,7 @@ func benchmarkMergePartTopN2GlobalTopNWithHists(partitions int, b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = statistics.MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled)
_, _, _, _ = MergePartTopN2GlobalTopN(loc, version, topNs, 10, hists, false, &isKilled)
}
}

Expand Down Expand Up @@ -124,20 +123,20 @@ func benchmarkMergeGlobalStatsTopNByConcurrencyWithHists(partitions int, b *test
h.Buckets = append(h.Buckets, statistics.Bucket{Repeat: 10, Count: 40})
hists = append(hists, h)
}
wrapper := globalstats.NewStatsWrapper(hists, topNs)
wrapper := NewStatsWrapper(hists, topNs)
const mergeConcurrency = 4
batchSize := len(wrapper.AllTopN) / mergeConcurrency
if batchSize < 1 {
batchSize = 1
} else if batchSize > globalstats.MaxPartitionMergeBatchSize {
batchSize = globalstats.MaxPartitionMergeBatchSize
} else if batchSize > MaxPartitionMergeBatchSize {
batchSize = MaxPartitionMergeBatchSize
}
gpool := gp.New(mergeConcurrency, 5*time.Minute)
defer gpool.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Benchmark merge 10 topN.
_, _, _, _ = globalstats.MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
_, _, _, _ = MergeGlobalStatsTopNByConcurrency(gpool, mergeConcurrency, batchSize, wrapper, loc, version, 10, false, &isKilled)
}
}

Expand Down
Loading