From 1897a3d26b5bd0a4dad0ccf42a2dd20376d9329a Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Fri, 6 Dec 2024 17:40:44 +0800 Subject: [PATCH] This is an automated cherry-pick of #58048 Signed-off-by: ti-chi-bot --- pkg/statistics/handle/handletest/BUILD.bazel | 34 + .../handle/handletest/handle_test.go | 1520 +++++++++++++++++ pkg/statistics/handle/storage/read.go | 823 +++++++++ pkg/statistics/handle/util/BUILD.bazel | 49 + pkg/statistics/handle/util/util.go | 283 +++ 5 files changed, 2709 insertions(+) create mode 100644 pkg/statistics/handle/handletest/BUILD.bazel create mode 100644 pkg/statistics/handle/handletest/handle_test.go create mode 100644 pkg/statistics/handle/storage/read.go create mode 100644 pkg/statistics/handle/util/BUILD.bazel create mode 100644 pkg/statistics/handle/util/util.go diff --git a/pkg/statistics/handle/handletest/BUILD.bazel b/pkg/statistics/handle/handletest/BUILD.bazel new file mode 100644 index 0000000000000..2884f52b182f1 --- /dev/null +++ b/pkg/statistics/handle/handletest/BUILD.bazel @@ -0,0 +1,34 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "handletest_test", + timeout = "short", + srcs = [ + "handle_test.go", + "main_test.go", + ], + flaky = True, + race = "on", + shard_count = 34, + deps = [ + "//pkg/config", + "//pkg/domain", + "//pkg/parser/model", + "//pkg/planner/cardinality", + "//pkg/sessionctx/variable", + "//pkg/statistics", + "//pkg/statistics/handle", + "//pkg/statistics/handle/util", + "//pkg/testkit", + "//pkg/testkit/analyzehelper", + "//pkg/testkit/testsetup", + "//pkg/types", + "//pkg/util/collate", + "//pkg/util/mock", + "//pkg/util/ranger", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_stretchr_testify//require", + "@com_github_tikv_client_go_v2//oracle", + "@org_uber_go_goleak//:goleak", + ], +) diff --git a/pkg/statistics/handle/handletest/handle_test.go b/pkg/statistics/handle/handletest/handle_test.go new file mode 100644 index 0000000000000..2fea656d8ae92 --- /dev/null +++ b/pkg/statistics/handle/handletest/handle_test.go @@ -0,0 +1,1520 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handletest + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/planner/cardinality" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/analyzehelper" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/collate" + "github.com/pingcap/tidb/pkg/util/mock" + "github.com/pingcap/tidb/pkg/util/ranger" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" +) + +func TestEmptyTable(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t (c1 int, c2 int, key cc1(c1), key cc2(c2))") + testKit.MustExec("analyze table t") + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + count := cardinality.ColumnGreaterRowCount(mock.NewContext(), statsTbl, types.NewDatum(1), tableInfo.Columns[0].ID) + require.Equal(t, 0.0, count) +} + +func TestColumnIDs(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t (c1 int, c2 int)") + testKit.MustExec("insert into t values(1, 2)") + testKit.MustExec("analyze table t all columns") + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + sctx := mock.NewContext() + ran := &ranger.Range{ + LowVal: []types.Datum{types.MinNotNullDatum()}, + HighVal: []types.Datum{types.NewIntDatum(2)}, + LowExclude: false, + HighExclude: true, + Collators: collate.GetBinaryCollatorSlice(1), + } + count, err := cardinality.GetRowCountByColumnRanges(sctx, &statsTbl.HistColl, tableInfo.Columns[0].ID, []*ranger.Range{ran}) + require.NoError(t, err) + require.Equal(t, float64(1), count) + + // Drop a column and the offset changed, + testKit.MustExec("alter table t drop column c1") + is = do.InfoSchema() + do.StatsHandle().Clear() + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo = tbl.Meta() + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + // At that time, we should get c2's stats instead of c1's. + count, err = cardinality.GetRowCountByColumnRanges(sctx, &statsTbl.HistColl, tableInfo.Columns[0].ID, []*ranger.Range{ran}) + require.NoError(t, err) + require.Equal(t, 1.0, count) +} + +func TestDurationToTS(t *testing.T) { + tests := []time.Duration{time.Millisecond, time.Second, time.Minute, time.Hour} + for _, test := range tests { + ts := util.DurationToTS(test) + require.Equal(t, int64(test), oracle.ExtractPhysical(ts)*int64(time.Millisecond)) + } +} + +func TestVersion(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit2 := testkit.NewTestKit(t, store) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t1 (c1 int, c2 int)") + testKit.MustExec("analyze table t1 all columns") + do := dom + is := do.InfoSchema() + tbl1, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t1")) + require.NoError(t, err) + tableInfo1 := tbl1.Meta() + h, err := handle.NewHandle( + testKit.Session(), + testKit2.Session(), + time.Millisecond, + is, + do.SysSessionPool(), + do.SysProcTracker(), + do.DDLNotifier(), + do.NextConnID, + do.ReleaseConnID, + ) + defer func() { + h.Close() + }() + require.NoError(t, err) + unit := oracle.ComposeTS(1, 0) + testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", 2*unit, tableInfo1.ID) + + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, 2*unit, h.MaxTableStatsVersion()) + statsTbl1 := h.GetTableStats(tableInfo1) + require.False(t, statsTbl1.Pseudo) + + testKit.MustExec("create table t2 (c1 int, c2 int)") + testKit.MustExec("analyze table t2 all columns") + is = do.InfoSchema() + tbl2, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t2")) + require.NoError(t, err) + tableInfo2 := tbl2.Meta() + // A smaller version write, and we can still read it. + testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", unit, tableInfo2.ID) + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, 2*unit, h.MaxTableStatsVersion()) + statsTbl2 := h.GetTableStats(tableInfo2) + require.False(t, statsTbl2.Pseudo) + + testKit.MustExec("insert t1 values(1,2)") + testKit.MustExec("analyze table t1") + offset := 3 * unit + testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", offset+4, tableInfo1.ID) + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, offset+uint64(4), h.MaxTableStatsVersion()) + statsTbl1 = h.GetTableStats(tableInfo1) + require.Equal(t, int64(1), statsTbl1.RealtimeCount) + + testKit.MustExec("insert t2 values(1,2)") + testKit.MustExec("analyze table t2") + // A smaller version write, and we can still read it. + testKit.MustExec("update mysql.stats_meta set version = ? where table_id = ?", offset+3, tableInfo2.ID) + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, offset+uint64(4), h.MaxTableStatsVersion()) + statsTbl2 = h.GetTableStats(tableInfo2) + require.Equal(t, int64(1), statsTbl2.RealtimeCount) + + testKit.MustExec("insert t2 values(1,2)") + testKit.MustExec("analyze table t2") + // A smaller version write, and we cannot read it. Because at this time, lastThree Version is 4. + testKit.MustExec("update mysql.stats_meta set version = 1 where table_id = ?", tableInfo2.ID) + require.NoError(t, h.Update(context.Background(), is)) + require.Equal(t, offset+uint64(4), h.MaxTableStatsVersion()) + statsTbl2 = h.GetTableStats(tableInfo2) + require.Equal(t, int64(1), statsTbl2.RealtimeCount) + + // We add an index and analyze it, but DDL doesn't load. + testKit.MustExec("alter table t2 add column c3 int") + testKit.MustExec("analyze table t2 all columns") + // load it with old schema. + require.NoError(t, h.Update(context.Background(), is)) + statsTbl2 = h.GetTableStats(tableInfo2) + require.False(t, statsTbl2.Pseudo) + require.Nil(t, statsTbl2.GetCol(int64(3))) + // Next time DDL updated. + is = do.InfoSchema() + require.NoError(t, h.Update(context.Background(), is)) + statsTbl2 = h.GetTableStats(tableInfo2) + require.False(t, statsTbl2.Pseudo) + require.Nil(t, statsTbl2.GetCol(int64(3))) + tbl2, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t2")) + require.NoError(t, err) + tableInfo2 = tbl2.Meta() + statsTbl2, err = h.TableStatsFromStorage(tableInfo2, tableInfo2.ID, true, 0) + require.NoError(t, err) + require.NotNil(t, statsTbl2.GetCol(int64(3))) +} + +func TestLoadHist(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t (c1 varchar(12), c2 char(12))") + do := dom + h := do.StatsHandle() + err := h.HandleDDLEvent(<-h.DDLEventCh()) + require.NoError(t, err) + rowCount := 10 + for i := 0; i < rowCount; i++ { + testKit.MustExec("insert into t values('a','ddd')") + } + testKit.MustExec("analyze table t") + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + oldStatsTbl := h.GetTableStats(tableInfo) + for i := 0; i < rowCount; i++ { + testKit.MustExec("insert into t values('bb','sdfga')") + } + require.NoError(t, h.DumpStatsDeltaToKV(true)) + err = h.Update(context.Background(), do.InfoSchema()) + require.NoError(t, err) + newStatsTbl := h.GetTableStats(tableInfo) + // The stats table is updated. + require.False(t, oldStatsTbl == newStatsTbl) + // Only the TotColSize of histograms is updated. + oldStatsTbl.ForEachColumnImmutable(func(id int64, hist *statistics.Column) bool { + require.Less(t, hist.TotColSize, newStatsTbl.GetCol(id).TotColSize) + + temp := hist.TotColSize + hist.TotColSize = newStatsTbl.GetCol(id).TotColSize + require.True(t, statistics.HistogramEqual(&hist.Histogram, &newStatsTbl.GetCol(id).Histogram, false)) + hist.TotColSize = temp + + require.True(t, hist.CMSketch.Equal(newStatsTbl.GetCol(id).CMSketch)) + require.Equal(t, newStatsTbl.GetCol(id).Info, hist.Info) + return false + }) + // Add column c3, we only update c3. + testKit.MustExec("alter table t add column c3 int") + err = h.HandleDDLEvent(<-h.DDLEventCh()) + require.NoError(t, err) + is = do.InfoSchema() + tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo = tbl.Meta() + require.NoError(t, h.Update(context.Background(), is)) + newStatsTbl2 := h.GetTableStats(tableInfo) + require.False(t, newStatsTbl2 == newStatsTbl) + // The histograms is not updated. + newStatsTbl.ForEachColumnImmutable(func(id int64, hist *statistics.Column) bool { + require.Equal(t, newStatsTbl2.GetCol(id), hist) + return false + }) + require.Greater(t, newStatsTbl2.GetCol(3).LastUpdateVersion, newStatsTbl2.GetCol(1).LastUpdateVersion) +} + +func TestCorrelation(t *testing.T) { + store := testkit.CreateMockStore(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("create table t(c1 int primary key, c2 int)") + testKit.MustExec("select * from t where c1 > 10 and c2 > 10") + testKit.MustExec("insert into t values(1,1),(3,12),(4,20),(2,7),(5,21)") + testKit.MustExec("set @@session.tidb_analyze_version=1") + testKit.MustExec("analyze table t") + result := testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "0", result.Rows()[0][9]) + require.Equal(t, "1", result.Rows()[1][9]) + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "1", result.Rows()[0][9]) + require.Equal(t, "1", result.Rows()[1][9]) + testKit.MustExec("insert into t values(8,18)") + testKit.MustExec("set @@session.tidb_analyze_version=1") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "0", result.Rows()[0][9]) + require.Equal(t, "0.8285714285714286", result.Rows()[1][9]) + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "1", result.Rows()[0][9]) + require.Equal(t, "0.8285714285714286", result.Rows()[1][9]) + + testKit.MustExec("truncate table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 0) + testKit.MustExec("insert into t values(1,21),(3,12),(4,7),(2,20),(5,1)") + testKit.MustExec("set @@session.tidb_analyze_version=1") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "0", result.Rows()[0][9]) + require.Equal(t, "-1", result.Rows()[1][9]) + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "1", result.Rows()[0][9]) + require.Equal(t, "-1", result.Rows()[1][9]) + testKit.MustExec("insert into t values(8,4)") + testKit.MustExec("set @@session.tidb_analyze_version=1") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "0", result.Rows()[0][9]) + require.Equal(t, "-0.9428571428571428", result.Rows()[1][9]) + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "1", result.Rows()[0][9]) + require.Equal(t, "-0.9428571428571428", result.Rows()[1][9]) + + testKit.MustExec("truncate table t") + testKit.MustExec("insert into t values (1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1),(10,1),(11,1),(12,1),(13,1),(14,1),(15,1),(16,1),(17,1),(18,1),(19,1),(20,2),(21,2),(22,2),(23,2),(24,2),(25,2)") + testKit.MustExec("set @@session.tidb_analyze_version=1") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "0", result.Rows()[0][9]) + require.Equal(t, "1", result.Rows()[1][9]) + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "1", result.Rows()[0][9]) + require.Equal(t, "1", result.Rows()[1][9]) + + testKit.MustExec("drop table t") + testKit.MustExec("create table t(c1 int, c2 int)") + testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(4,20),(5,21),(8,18)") + testKit.MustExec("set @@session.tidb_analyze_version=1") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "1", result.Rows()[0][9]) + require.Equal(t, "0.8285714285714286", result.Rows()[1][9]) + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "1", result.Rows()[0][9]) + require.Equal(t, "0.8285714285714286", result.Rows()[1][9]) + + testKit.MustExec("truncate table t") + testKit.MustExec("insert into t values(1,1),(2,7),(3,12),(8,18),(4,20),(5,21)") + testKit.MustExec("set @@session.tidb_analyze_version=1") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "0.8285714285714286", result.Rows()[0][9]) + require.Equal(t, "1", result.Rows()[1][9]) + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't'").Sort() + require.Len(t, result.Rows(), 2) + require.Equal(t, "0.8285714285714286", result.Rows()[0][9]) + require.Equal(t, "1", result.Rows()[1][9]) + + testKit.MustExec("drop table t") + testKit.MustExec("create table t(c1 int primary key, c2 int, c3 int, key idx_c2(c2))") + testKit.MustExec("insert into t values(1,1,1),(2,2,2),(3,3,3)") + testKit.MustExec("set @@session.tidb_analyze_version=1") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 0").Sort() + require.Len(t, result.Rows(), 3) + require.Equal(t, "0", result.Rows()[0][9]) + require.Equal(t, "1", result.Rows()[1][9]) + require.Equal(t, "1", result.Rows()[2][9]) + result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 1").Sort() + require.Len(t, result.Rows(), 1) + require.Equal(t, "0", result.Rows()[0][9]) + testKit.MustExec("set @@tidb_analyze_version=2") + testKit.MustExec("analyze table t") + result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 0").Sort() + require.Len(t, result.Rows(), 3) + require.Equal(t, "1", result.Rows()[0][9]) + require.Equal(t, "1", result.Rows()[1][9]) + require.Equal(t, "1", result.Rows()[2][9]) + result = testKit.MustQuery("show stats_histograms where Table_name = 't' and Is_index = 1").Sort() + require.Len(t, result.Rows(), 1) + require.Equal(t, "0", result.Rows()[0][9]) +} + +func TestMergeGlobalTopN(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t;") + tk.MustExec("set @@session.tidb_analyze_version=2;") + tk.MustExec("set @@session.tidb_partition_prune_mode='dynamic';") + tk.MustExec(`create table t (a int, b int, key(b)) partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20) + );`) + tk.MustExec("insert into t values(1, 1), (1, 1), (1, 1), (1, 1), (2, 2), (2, 2), (3, 3), (3, 3), (3, 3), " + + "(11, 11), (11, 11), (11, 11), (12, 12), (12, 12), (12, 12), (13, 3), (13, 3);") + tk.MustExec("analyze table t with 2 topn;") + // The top2 values in partition p0 are 1(count = 4) and 3(count = 3). + tk.MustQuery("show stats_topn where table_name = 't' and column_name = 'b' and partition_name = 'p0';").Check(testkit.Rows( + ("test t p0 b 0 1 4"), + ("test t p0 b 0 3 3"), + ("test t p0 b 1 1 4"), + ("test t p0 b 1 3 3"))) + // The top2 values in partition p1 are 11(count = 3) and 12(count = 3). + tk.MustQuery("show stats_topn where table_name = 't' and column_name = 'b' and partition_name = 'p1';").Check(testkit.Rows( + ("test t p1 b 0 11 3"), + ("test t p1 b 0 12 3"), + ("test t p1 b 1 11 3"), + ("test t p1 b 1 12 3"))) + // The top2 values in global are 1(count = 4) and 3(count = 5). + // Notice: The value 3 does not appear in the topN structure of partition one. + // But we can still use the histogram to calculate its accurate value. + tk.MustQuery("show stats_topn where table_name = 't' and column_name = 'b' and partition_name = 'global';").Check(testkit.Rows( + ("test t global b 0 1 4"), + ("test t global b 0 3 5"), + ("test t global b 1 1 4"), + ("test t global b 1 3 5"))) +} + +func TestExtendedStatsOps(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int primary key, b int, c int, d int)") + tk.MustExec("insert into t values(1,1,5,1),(2,2,4,2),(3,3,3,3),(4,4,2,4),(5,5,1,5)") + tk.MustExec("analyze table t") + err := tk.ExecToErr("alter table not_exist_db.t add stats_extended s1 correlation(b,c)") + require.Equal(t, "[schema:1146]Table 'not_exist_db.t' doesn't exist", err.Error()) + err = tk.ExecToErr("alter table not_exist_tbl add stats_extended s1 correlation(b,c)") + require.Equal(t, "[schema:1146]Table 'test.not_exist_tbl' doesn't exist", err.Error()) + err = tk.ExecToErr("alter table t add stats_extended s1 correlation(b,e)") + require.Equal(t, "[schema:1054]Unknown column 'e' in 't'", err.Error()) + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustQuery("show warnings").Check(testkit.Rows( + "Warning 1105 No need to create correlation statistics on the integer primary key column", + )) + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows()) + err = tk.ExecToErr("alter table t add stats_extended s1 correlation(b,c,d)") + require.Equal(t, "Only support Correlation and Dependency statistics types on 2 columns", err.Error()) + + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows()) + tk.MustExec("alter table t add stats_extended s1 correlation(b,c)") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "2 [2,3] 0", + )) + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 0) + + tk.MustExec("update mysql.stats_extended set status = 1 where name = 's1'") + do.StatsHandle().Clear() + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 1) + + tk.MustExec("alter table t drop stats_extended s1") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "2 [2,3] 2", + )) + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 0) +} + +func TestAdminReloadStatistics1(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int primary key, b int, c int, d int)") + tk.MustExec("insert into t values(1,1,5,1),(2,2,4,2),(3,3,3,3),(4,4,2,4),(5,5,1,5)") + tk.MustExec("analyze table t") + tk.MustExec("alter table t add stats_extended s1 correlation(b,c)") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "2 [2,3] 0", + )) + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 0) + + tk.MustExec("update mysql.stats_extended set status = 1 where name = 's1'") + do.StatsHandle().Clear() + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 1) + + tk.MustExec("delete from mysql.stats_extended where name = 's1'") + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 1) + + tk.MustExec("admin reload stats_extended") + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 0) +} + +func TestAdminReloadStatistics2(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("analyze table t") + tk.MustQuery("select stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "1.000000 1", + )) + rows := tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + + tk.MustExec("delete from mysql.stats_extended where name = 's1'") + is := dom.InfoSchema() + dom.StatsHandle().Update(context.Background(), is) + tk.MustQuery("select stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows()) + rows = tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + + tk.MustExec("admin reload stats_extended") + tk.MustQuery("select stats, status from mysql.stats_extended where name = 's1'").Check(testkit.Rows()) + rows = tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 0) +} + +func TestCorrelationStatsCompute(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, c int, index idx(a, b, c))") + tk.MustExec("insert into t values(1,1,5),(2,2,4),(3,3,3),(4,4,2),(5,5,1)") + tk.MustExec("analyze table t") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended").Check(testkit.Rows()) + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("alter table t add stats_extended s2 correlation(a,c)") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( + "2 [1,2] 0", + "2 [1,3] 0", + )) + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 0) + + tk.MustExec("analyze table t") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( + "2 [1,2] 1.000000 1", + "2 [1,3] -1.000000 1", + )) + tk.MustExec("set @@session.tidb_analyze_version=2") + tk.MustExec("analyze table t") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( + "2 [1,2] 1.000000 1", + "2 [1,3] -1.000000 1", + )) + err = do.StatsHandle().Update(context.Background(), is) + require.NoError(t, err) + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 2) + foundS1, foundS2 := false, false + for name, item := range statsTbl.ExtendedStats.Stats { + switch name { + case "s1": + foundS1 = true + require.Equal(t, float64(1), item.ScalarVals) + case "s2": + foundS2 = true + require.Equal(t, float64(-1), item.ScalarVals) + default: + require.FailNow(t, "Unexpected extended stats in cache") + } + } + require.True(t, foundS1 && foundS2) + + // Check that table with NULLs won't cause panic + tk.MustExec("delete from t") + tk.MustExec("insert into t values(1,null,2), (2,null,null)") + tk.MustExec("set @@session.tidb_analyze_version=1") + tk.MustExec("analyze table t") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( + "2 [1,2] 0.000000 1", + "2 [1,3] 1.000000 1", + )) + tk.MustExec("set @@session.tidb_analyze_version=2") + tk.MustExec("analyze table t") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( + "2 [1,2] 0.000000 1", + "2 [1,3] 1.000000 1", + )) + tk.MustExec("insert into t values(3,3,3)") + tk.MustExec("set @@session.tidb_analyze_version=1") + tk.MustExec("analyze table t") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( + "2 [1,2] 1.000000 1", + "2 [1,3] 1.000000 1", + )) + tk.MustExec("set @@session.tidb_analyze_version=2") + tk.MustExec("analyze table t") + tk.MustQuery("select type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( + "2 [1,2] 1.000000 1", + "2 [1,3] 1.000000 1", + )) +} + +func TestSyncStatsExtendedRemoval(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("analyze table t") + do := dom + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + statsTbl := do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 1) + item := statsTbl.ExtendedStats.Stats["s1"] + require.NotNil(t, item) + result := tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + require.Len(t, result.Rows(), 1) + + tk.MustExec("alter table t drop stats_extended s1") + statsTbl = do.StatsHandle().GetTableStats(tableInfo) + require.NotNil(t, statsTbl) + require.NotNil(t, statsTbl.ExtendedStats) + require.Len(t, statsTbl.ExtendedStats.Stats, 0) + result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + require.Len(t, result.Rows(), 0) +} + +func TestStaticPartitionPruneMode(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_partition_prune_mode='" + string(variable.Static) + "'") + tk.MustExec("use test") + tk.MustExec(`create table t (a int, key(a)) partition by range(a) + (partition p0 values less than (10), + partition p1 values less than (22))`) + tk.MustExec(`insert into t values (1), (2), (3), (10), (11)`) + tk.MustExec(`analyze table t`) + tk.MustNoGlobalStats("t") + tk.MustExec("set @@tidb_partition_prune_mode='" + string(variable.Dynamic) + "'") + tk.MustNoGlobalStats("t") + + tk.MustExec("set @@tidb_partition_prune_mode='" + string(variable.Static) + "'") + tk.MustExec(`insert into t values (4), (5), (6)`) + tk.MustExec(`analyze table t partition p0`) + tk.MustNoGlobalStats("t") + tk.MustExec("set @@tidb_partition_prune_mode='" + string(variable.Dynamic) + "'") + tk.MustNoGlobalStats("t") + tk.MustExec("set @@tidb_partition_prune_mode='" + string(variable.Static) + "'") +} + +func TestMergeIdxHist(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_partition_prune_mode='" + string(variable.Dynamic) + "'") + defer tk.MustExec("set @@tidb_partition_prune_mode='" + string(variable.Static) + "'") + tk.MustExec("use test") + tk.MustExec(` + create table t (a int, key(a)) + partition by range (a) ( + partition p0 values less than (10), + partition p1 values less than (20))`) + tk.MustExec("set @@tidb_analyze_version=2") + defer tk.MustExec("set @@tidb_analyze_version=1") + tk.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (6), (null), (11), (12), (13), (14), (15), (16), (17), (18), (19), (19)") + + tk.MustExec("analyze table t with 2 topn, 2 buckets") + rows := tk.MustQuery("show stats_buckets where partition_name like 'global'") + require.Len(t, rows.Rows(), 4) +} + +func TestPartitionPruneModeSessionVariable(t *testing.T) { + failpoint.Enable("github.com/pingcap/tidb/pkg/planner/core/forceDynamicPrune", `return(true)`) + defer failpoint.Disable("github.com/pingcap/tidb/pkg/planner/core/forceDynamicPrune") + + store := testkit.CreateMockStore(t) + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("set tidb_cost_model_version=1") + tk1.MustExec("set @@tidb_partition_prune_mode = '" + string(variable.Dynamic) + "'") + tk1.MustExec(`set @@tidb_analyze_version=2`) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("set tidb_cost_model_version=1") + tk2.MustExec("set @@tidb_partition_prune_mode = '" + string(variable.Static) + "'") + tk2.MustExec(`set @@tidb_analyze_version=2`) + + tk1.MustExec(`create table t (a int, key(a)) partition by range(a) + (partition p0 values less than (10), + partition p1 values less than (22))`) + + tk1.MustQuery("explain format = 'brief' select * from t").Check(testkit.Rows( + "TableReader 10000.00 root partition:all data:TableFullScan", + "└─TableFullScan 10000.00 cop[tikv] table:t keep order:false, stats:pseudo", + )) + tk2.MustQuery("explain format = 'brief' select * from t").Check(testkit.Rows( + "PartitionUnion 20000.00 root ", + "├─TableReader 10000.00 root data:TableFullScan", + "│ └─TableFullScan 10000.00 cop[tikv] table:t, partition:p0 keep order:false, stats:pseudo", + "└─TableReader 10000.00 root data:TableFullScan", + " └─TableFullScan 10000.00 cop[tikv] table:t, partition:p1 keep order:false, stats:pseudo", + )) + + tk1.MustExec(`insert into t values (1), (2), (3), (10), (11)`) + tk1.MustExec(`analyze table t with 1 topn, 2 buckets`) + tk1.MustQuery("explain format = 'brief' select * from t").Check(testkit.Rows( + "TableReader 5.00 root partition:all data:TableFullScan", + "└─TableFullScan 5.00 cop[tikv] table:t keep order:false", + )) + tk2.MustQuery("explain format = 'brief' select * from t").Check(testkit.Rows( + "PartitionUnion 5.00 root ", + "├─TableReader 3.00 root data:TableFullScan", + "│ └─TableFullScan 3.00 cop[tikv] table:t, partition:p0 keep order:false", + "└─TableReader 2.00 root data:TableFullScan", + " └─TableFullScan 2.00 cop[tikv] table:t, partition:p1 keep order:false", + )) + + tk1.MustExec("set @@tidb_partition_prune_mode = '" + string(variable.Static) + "'") + tk1.MustQuery("explain format = 'brief' select * from t").Check(testkit.Rows( + "PartitionUnion 5.00 root ", + "├─TableReader 3.00 root data:TableFullScan", + "│ └─TableFullScan 3.00 cop[tikv] table:t, partition:p0 keep order:false", + "└─TableReader 2.00 root data:TableFullScan", + " └─TableFullScan 2.00 cop[tikv] table:t, partition:p1 keep order:false", + )) + tk2.MustExec("set @@tidb_partition_prune_mode = '" + string(variable.Dynamic) + "'") + tk2.MustQuery("explain format = 'brief' select * from t").Check(testkit.Rows( + "TableReader 5.00 root partition:all data:TableFullScan", + "└─TableFullScan 5.00 cop[tikv] table:t keep order:false", + )) +} + +func TestRepetitiveAddDropExtendedStats(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustQuery("select name, status from mysql.stats_extended where name = 's1'").Sort().Check(testkit.Rows( + "s1 0", + )) + result := tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + require.Len(t, result.Rows(), 0) + tk.MustExec("analyze table t") + tk.MustQuery("select name, status from mysql.stats_extended where name = 's1'").Sort().Check(testkit.Rows( + "s1 1", + )) + result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + require.Len(t, result.Rows(), 1) + tk.MustExec("alter table t drop stats_extended s1") + tk.MustQuery("select name, status from mysql.stats_extended where name = 's1'").Sort().Check(testkit.Rows( + "s1 2", + )) + result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + require.Len(t, result.Rows(), 0) + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustQuery("select name, status from mysql.stats_extended where name = 's1'").Sort().Check(testkit.Rows( + "s1 0", + )) + result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + require.Len(t, result.Rows(), 0) + tk.MustExec("analyze table t") + tk.MustQuery("select name, status from mysql.stats_extended where name = 's1'").Sort().Check(testkit.Rows( + "s1 1", + )) + result = tk.MustQuery("show stats_extended where db_name = 'test' and table_name = 't'") + require.Len(t, result.Rows(), 1) +} + +func TestDuplicateFMSketch(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + defer tk.MustExec("set @@tidb_partition_prune_mode='static'") + tk.MustExec("create table t(a int, b int, c int) partition by hash(a) partitions 3") + tk.MustExec("insert into t values (1, 1, 1)") + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a", "b", "c") + tk.MustExec("analyze table t") + tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("9")) + tk.MustExec("analyze table t") + tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("9")) + + tk.MustExec("alter table t drop column b") + require.NoError(t, dom.StatsHandle().GCStats(dom.InfoSchema(), time.Duration(0))) + tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6")) +} + +func TestIndexFMSketch(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@session.tidb_analyze_version = 1") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int, c int, index ia(a), index ibc(b, c)) partition by hash(a) partitions 3") + tk.MustExec("insert into t values (1, 1, 1)") + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + defer tk.MustExec("set @@tidb_partition_prune_mode='static'") + tk.MustExec("analyze table t index ia") + tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("3")) + tk.MustExec("analyze table t index ibc") + tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6")) + tk.MustExec("analyze table t") + tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("15")) + tk.MustExec("drop table if exists t") + require.NoError(t, dom.StatsHandle().GCStats(dom.InfoSchema(), 0)) + + // clustered index + tk.MustExec("drop table if exists t") + tk.MustExec("set @@tidb_enable_clustered_index=ON") + tk.MustExec("create table t (a datetime, b datetime, primary key (a)) partition by hash(year(a)) partitions 3") + tk.MustExec("insert into t values ('2000-01-01', '2000-01-01')") + tk.MustExec("analyze table t") + tk.MustQuery("select count(*) from mysql.stats_fm_sketch").Check(testkit.Rows("6")) + tk.MustExec("drop table if exists t") + require.NoError(t, dom.StatsHandle().GCStats(dom.InfoSchema(), 0)) + + // test NDV + checkNDV := func(rows, ndv int) { + tk.MustExec("analyze table t") + rs := tk.MustQuery("select value from mysql.stats_fm_sketch").Rows() + require.Len(t, rs, rows) + for i := range rs { + fm, err := statistics.DecodeFMSketch([]byte(rs[i][0].(string))) + require.NoError(t, err) + require.Equal(t, int64(ndv), fm.NDV()) + } + } + + tk.MustExec("set @@tidb_enable_clustered_index=OFF") + tk.MustExec("create table t(a int, key(a)) partition by hash(a) partitions 3") + tk.MustExec("insert into t values (1), (2), (2), (3)") + checkNDV(6, 1) + tk.MustExec("insert into t values (4), (5), (6)") + checkNDV(6, 2) + tk.MustExec("insert into t values (2), (5)") + checkNDV(6, 2) + tk.MustExec("drop table if exists t") + require.NoError(t, dom.StatsHandle().GCStats(dom.InfoSchema(), 0)) + + // clustered index + tk.MustExec("set @@tidb_enable_clustered_index=ON") + tk.MustExec("create table t (a datetime, b datetime, primary key (a)) partition by hash(year(a)) partitions 3") + tk.MustExec("insert into t values ('2000-01-01', '2001-01-01'), ('2001-01-01', '2001-01-01'), ('2002-01-01', '2001-01-01')") + checkNDV(6, 1) + tk.MustExec("insert into t values ('1999-01-01', '1998-01-01'), ('1997-01-02', '1999-01-02'), ('1998-01-03', '1999-01-03')") + checkNDV(6, 2) +} + +func TestShowExtendedStats4DropColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, c int)") + tk.MustExec("insert into t values(1,1,1),(2,2,2),(3,3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("alter table t add stats_extended s2 correlation(b,c)") + tk.MustExec("analyze table t") + rows := tk.MustQuery("show stats_extended").Sort().Rows() + require.Len(t, rows, 2) + require.Equal(t, "s1", rows[0][2]) + require.Equal(t, "[a,b]", rows[0][3]) + require.Equal(t, "s2", rows[1][2]) + require.Equal(t, "[b,c]", rows[1][3]) + + tk.MustExec("alter table t drop column b") + rows = tk.MustQuery("show stats_extended").Rows() + require.Len(t, rows, 0) + + // Previously registered extended stats should be invalid for re-created columns. + tk.MustExec("alter table t add column b int") + rows = tk.MustQuery("show stats_extended").Rows() + require.Len(t, rows, 0) +} + +func TestExtStatsOnReCreatedTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("analyze table t") + rows := tk.MustQuery("select table_id, stats from mysql.stats_extended where name = 's1'").Rows() + require.Len(t, rows, 1) + tableID1 := rows[0][0] + require.Equal(t, "1.000000", rows[0][1]) + rows = tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + require.Equal(t, "1.000000", rows[0][5]) + + tk.MustExec("drop table t") + rows = tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 0) + + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,3),(2,2),(3,1)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("analyze table t") + rows = tk.MustQuery("select table_id, stats from mysql.stats_extended where name = 's1' order by stats").Rows() + require.Len(t, rows, 2) + tableID2 := rows[0][0] + require.NotEqual(t, tableID1, tableID2) + require.Equal(t, tableID1, rows[1][0]) + require.Equal(t, "-1.000000", rows[0][1]) + require.Equal(t, "1.000000", rows[1][1]) + rows = tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + require.Equal(t, "-1.000000", rows[0][5]) +} + +func TestExtStatsOnReCreatedColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("analyze table t") + tk.MustQuery("select column_ids, stats from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "[1,2] 1.000000", + )) + rows := tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + require.Equal(t, "[a,b]", rows[0][3]) + require.Equal(t, "1.000000", rows[0][5]) + + tk.MustExec("alter table t drop column b") + tk.MustExec("alter table t add column b int") + tk.MustQuery("select * from t").Sort().Check(testkit.Rows( + "1 ", + "2 ", + "3 ", + )) + tk.MustExec("update t set b = 3 where a = 1") + tk.MustExec("update t set b = 2 where a = 2") + tk.MustExec("update t set b = 1 where a = 3") + tk.MustQuery("select * from t").Sort().Check(testkit.Rows( + "1 3", + "2 2", + "3 1", + )) + tk.MustExec("analyze table t") + // Previous extended stats would not be collected and would not take effect anymore, it will be removed by stats GC. + tk.MustQuery("select column_ids, stats from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "[1,2] 1.000000", + )) + rows = tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 0) +} + +func TestExtStatsOnRenamedColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("analyze table t") + tk.MustQuery("select column_ids, stats from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "[1,2] 1.000000", + )) + rows := tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + require.Equal(t, "[a,b]", rows[0][3]) + require.Equal(t, "1.000000", rows[0][5]) + + tk.MustExec("alter table t rename column b to c") + tk.MustExec("update t set c = 3 where a = 1") + tk.MustExec("update t set c = 2 where a = 2") + tk.MustExec("update t set c = 1 where a = 3") + tk.MustQuery("select * from t").Sort().Check(testkit.Rows( + "1 3", + "2 2", + "3 1", + )) + tk.MustExec("analyze table t") + // Previous extended stats would still be collected and take effect. + tk.MustQuery("select column_ids, stats from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "[1,2] -1.000000", + )) + rows = tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + require.Equal(t, "[a,c]", rows[0][3]) + require.Equal(t, "-1.000000", rows[0][5]) +} + +func TestExtStatsOnModifiedColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set session tidb_enable_extended_stats = on") + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int)") + tk.MustExec("insert into t values(1,1),(2,2),(3,3)") + tk.MustExec("alter table t add stats_extended s1 correlation(a,b)") + tk.MustExec("analyze table t") + tk.MustQuery("select column_ids, stats from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "[1,2] 1.000000", + )) + rows := tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + require.Equal(t, "[a,b]", rows[0][3]) + require.Equal(t, "1.000000", rows[0][5]) + + tk.MustExec("alter table t modify column b bigint") + tk.MustExec("update t set b = 3 where a = 1") + tk.MustExec("update t set b = 2 where a = 2") + tk.MustExec("update t set b = 1 where a = 3") + tk.MustQuery("select * from t").Sort().Check(testkit.Rows( + "1 3", + "2 2", + "3 1", + )) + tk.MustExec("analyze table t") + // Previous extended stats would still be collected and take effect. + tk.MustQuery("select column_ids, stats from mysql.stats_extended where name = 's1'").Check(testkit.Rows( + "[1,2] -1.000000", + )) + rows = tk.MustQuery("show stats_extended where stats_name = 's1'").Rows() + require.Len(t, rows, 1) + require.Equal(t, "[a,b]", rows[0][3]) + require.Equal(t, "-1.000000", rows[0][5]) +} + +func TestCorrelationWithDefinedCollate(t *testing.T) { + store := testkit.CreateMockStore(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t") + testKit.MustExec("create table t(a int primary key, b varchar(8) character set utf8mb4 collate utf8mb4_general_ci, c varchar(8) character set utf8mb4 collate utf8mb4_bin)") + testKit.MustExec("insert into t values(1,'aa','aa'),(2,'Cb','Cb'),(3,'CC','CC')") + analyzehelper.TriggerPredicateColumnsCollection(t, testKit, store, "t", "a", "b", "c") + testKit.MustExec("analyze table t") + testKit.MustQuery("select a from t order by b").Check(testkit.Rows( + "1", + "2", + "3", + )) + testKit.MustQuery("select a from t order by c").Check(testkit.Rows( + "3", + "2", + "1", + )) + rows := testKit.MustQuery("show stats_histograms where table_name = 't'").Sort().Rows() + require.Len(t, rows, 3) + require.Equal(t, "1", rows[1][9]) + require.Equal(t, "-1", rows[2][9]) + testKit.MustExec("set session tidb_enable_extended_stats = on") + testKit.MustExec("alter table t add stats_extended s1 correlation(b,c)") + testKit.MustExec("analyze table t") + rows = testKit.MustQuery("show stats_extended where stats_name = 's1'").Sort().Rows() + require.Len(t, rows, 1) + require.Equal(t, "[b,c]", rows[0][3]) + require.Equal(t, "-1.000000", rows[0][5]) +} + +func TestLoadHistogramWithCollate(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t") + testKit.MustExec("create table t(a varchar(10) collate utf8mb4_unicode_ci);") + testKit.MustExec("insert into t values('abcdefghij');") + testKit.MustExec("insert into t values('abcdufghij');") + testKit.MustExec("analyze table t with 0 topn;") + do := dom + h := do.StatsHandle() + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + _, err = h.TableStatsFromStorage(tblInfo, tblInfo.ID, true, 0) + require.NoError(t, err) +} + +func TestStatsCacheUpdateSkip(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + testKit := testkit.NewTestKit(t, store) + do := dom + h := do.StatsHandle() + testKit.MustExec("use test") + testKit.MustExec("create table t (c1 int, c2 int)") + testKit.MustExec("insert into t values(1, 2)") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + testKit.MustExec("analyze table t") + is := do.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + statsTbl1 := h.GetTableStats(tableInfo) + require.False(t, statsTbl1.Pseudo) + h.Update(context.Background(), is) + statsTbl2 := h.GetTableStats(tableInfo) + require.Equal(t, statsTbl2, statsTbl1) +} + +func testIncrementalModifyCountUpdateHelper(analyzeSnapshot bool) func(*testing.T) { + return func(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + if analyzeSnapshot { + tk.MustExec("set @@session.tidb_enable_analyze_snapshot = on") + } else { + tk.MustExec("set @@session.tidb_enable_analyze_snapshot = off") + } + tk.MustExec("create table t(a int)") + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a") + tk.MustExec("set @@session.tidb_analyze_version = 2") + h := dom.StatsHandle() + err := h.HandleDDLEvent(<-h.DDLEventCh()) + require.NoError(t, err) + tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + tid := tblInfo.ID + + tk.MustExec("insert into t values(1),(2),(3)") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + err = h.Update(context.Background(), dom.InfoSchema()) + require.NoError(t, err) + tk.MustExec("analyze table t") + tk.MustQuery(fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tid)).Check(testkit.Rows( + "3 0", + )) + + tk.MustExec("begin") + txn, err := tk.Session().Txn(false) + require.NoError(t, err) + startTS := txn.StartTS() + tk.MustExec("commit") + + tk.MustExec("insert into t values(4),(5),(6)") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + err = h.Update(context.Background(), dom.InfoSchema()) + require.NoError(t, err) + + // Simulate that the analyze would start before and finish after the second insert. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/injectAnalyzeSnapshot", fmt.Sprintf("return(%d)", startTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/injectBaseCount", "return(3)")) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/executor/injectBaseModifyCount", "return(0)")) + tk.MustExec("analyze table t") + if analyzeSnapshot { + // Check the count / modify_count changes during the analyze are not lost. + tk.MustQuery(fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tid)).Check(testkit.Rows( + "6 3", + )) + // Check the histogram is correct for the snapshot analyze. + tk.MustQuery(fmt.Sprintf("select distinct_count from mysql.stats_histograms where table_id = %d", tid)).Check(testkit.Rows( + "3", + )) + } else { + // Since analyze use max ts to read data, it finds the row count is 6 and directly set count to 6 rather than incrementally update it. + // But it still incrementally updates modify_count. + tk.MustQuery(fmt.Sprintf("select count, modify_count from mysql.stats_meta where table_id = %d", tid)).Check(testkit.Rows( + "6 3", + )) + // Check the histogram is collected from the latest data rather than the snapshot at startTS. + tk.MustQuery(fmt.Sprintf("select distinct_count from mysql.stats_histograms where table_id = %d", tid)).Check(testkit.Rows( + "6", + )) + } + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/injectAnalyzeSnapshot")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/injectBaseCount")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/executor/injectBaseModifyCount")) + } +} + +func TestIncrementalModifyCountUpdate(t *testing.T) { + for _, analyzeSnapshot := range []bool{true, false} { + t.Run(fmt.Sprintf("%s-%t", t.Name(), analyzeSnapshot), testIncrementalModifyCountUpdateHelper(analyzeSnapshot)) + } +} + +func TestRecordHistoricalStatsToStorage(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 2") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b varchar(10))") + tk.MustExec("insert into t value(1, 'aaa'), (3, 'aab'), (5, 'bba'), (2, 'bbb'), (4, 'cca'), (6, 'ccc')") + // mark column stats as needed + tk.MustExec("select * from t where a = 3") + tk.MustExec("select * from t where b = 'bbb'") + tk.MustExec("alter table t add index single(a)") + tk.MustExec("alter table t add index multi(a, b)") + tk.MustExec("analyze table t with 2 topn") + + tableInfo, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + version, err := dom.StatsHandle().RecordHistoricalStatsToStorage("t", tableInfo.Meta(), tableInfo.Meta().ID, false) + require.NoError(t, err) + + rows := tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where version = '%d'", version)).Rows() + num, _ := strconv.Atoi(rows[0][0].(string)) + require.GreaterOrEqual(t, num, 1) +} + +func TestEvictedColumnLoadedStatus(t *testing.T) { + t.Skip("skip this test because it is useless") + restore := config.RestoreFunc() + defer restore() + config.UpdateGlobal(func(conf *config.Config) { + conf.Performance.EnableStatsCacheMemQuota = true + }) + store, dom := testkit.CreateMockStoreAndDomain(t) + dom.StatsHandle().SetLease(0) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@tidb_analyze_version = 1") + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("analyze table test.t") + tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.Nil(t, err) + tblStats := domain.GetDomain(tk.Session()).StatsHandle().GetTableStats(tbl.Meta()) + tblStats.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.True(t, col.IsStatsInitialized()) + return false + }) + + domain.GetDomain(tk.Session()).StatsHandle().SetStatsCacheCapacity(1) + tblStats = domain.GetDomain(tk.Session()).StatsHandle().GetTableStats(tbl.Meta()) + tblStats.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.True(t, col.IsStatsInitialized()) + return false + }) +} + +func TestUninitializedStatsStatus(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + dom.StatsHandle().SetLease(0) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int, b int, c int, index idx_a(a))") + h := dom.StatsHandle() + require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + tk.MustExec("insert into t values (1,2,2), (3,4,4), (5,6,6), (7,8,8), (9,10,10)") + require.NoError(t, h.DumpStatsDeltaToKV(true)) + is := dom.InfoSchema() + require.NoError(t, h.Update(context.Background(), is)) + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + tblStats := h.GetTableStats(tblInfo) + tblStats.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.False(t, col.IsStatsInitialized()) + return false + }) + tblStats.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.False(t, idx.IsStatsInitialized()) + return false + }) + tk.MustQuery("show stats_histograms where db_name = 'test' and table_name = 't'").Check(testkit.Rows()) + checkStatsPseudo := func() { + rows := tk.MustQuery("explain select * from t").Rows() + operatorInfo := rows[len(rows)-1][4].(string) + require.True(t, strings.Contains(operatorInfo, "stats:pseudo")) + } + tk.MustExec("set @@tidb_enable_pseudo_for_outdated_stats = true") + checkStatsPseudo() + tk.MustExec("set @@tidb_enable_pseudo_for_outdated_stats = false") + checkStatsPseudo() +} + +func TestIssue39336(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(` +create table t1 ( + a datetime(3) default null, + b int +) partition by range (b) ( + partition p0 values less than (1000), + partition p1 values less than (maxvalue) +)`) + tk.MustExec("set @@sql_mode=''") + tk.MustExec("set @@tidb_analyze_version=2") + tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") + tk.MustExec(` +insert into t1 values +('1000-00-09 00:00:00.000', 1), +('1000-00-06 00:00:00.000', 1), +('1000-00-06 00:00:00.000', 1), +('2022-11-23 14:24:30.000', 1), +('2022-11-23 14:24:32.000', 1), +('2022-11-23 14:24:33.000', 1), +('2022-11-23 14:24:35.000', 1), +('2022-11-23 14:25:08.000', 1001), +('2022-11-23 14:25:09.000', 1001)`) + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t1", "a", "b") + tk.MustExec("analyze table t1 with 0 topn") + rows := tk.MustQuery("show analyze status where job_info like 'merge global stats%'").Rows() + require.Len(t, rows, 1) + require.Equal(t, "finished", rows[0][7]) +} + +func checkAllEvicted(t *testing.T, statsTbl *statistics.Table) { + statsTbl.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.True(t, col.IsAllEvicted()) + return false + }) + statsTbl.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.True(t, idx.IsAllEvicted()) + return false + }) +} + +func TestInitStatsLite(t *testing.T) { + oriVal := config.GetGlobalConfig().Performance.LiteInitStats + config.GetGlobalConfig().Performance.LiteInitStats = true + defer func() { + config.GetGlobalConfig().Performance.LiteInitStats = oriVal + }() + + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t(a int, b int, c int, primary key(a), key idxb(b), key idxc(c))") + tk.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),(6,6,6),(7,7,7),(8,8,8),(9,9,9)") + + h := dom.StatsHandle() + // set lease > 0 to trigger on-demand stats load. + h.SetLease(time.Millisecond) + defer func() { + h.SetLease(0) + }() + + is := dom.InfoSchema() + tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + colBID := tblInfo.Columns[1].ID + colCID := tblInfo.Columns[2].ID + idxBID := tblInfo.Indices[0].ID + idxCID := tblInfo.Indices[1].ID + + tk.MustExec("analyze table t with 2 topn, 2 buckets") + statsTbl0 := h.GetTableStats(tblInfo) + checkAllEvicted(t, statsTbl0) + + h.Clear() + require.NoError(t, h.InitStatsLite(context.Background())) + statsTbl1 := h.GetTableStats(tblInfo) + checkAllEvicted(t, statsTbl1) + require.Equal(t, int(statistics.Version2), statsTbl1.StatsVer) + { + // internal.AssertTableEqual(t, statsTbl0, statsTbl1) + // statsTbl0 is loaded when the cache has pseudo table. + // TODO: We haven't optimize the pseudo table's memory usage yet. So here the two will be different. + require.True(t, statsTbl0.ColNum() > 0) + require.True(t, statsTbl0.IdxNum() > 0) + require.True(t, statsTbl1.ColNum() == 0) + require.True(t, statsTbl1.IdxNum() == 0) + } + + // async stats load + tk.MustExec("set @@tidb_stats_load_sync_wait = 0") + tk.MustExec("explain select * from t where b > 1") + require.NoError(t, h.LoadNeededHistograms(is)) + statsTbl2 := h.GetTableStats(tblInfo) + colBStats1 := statsTbl2.GetCol(colBID) + colCStats := statsTbl2.GetCol(colCID) + require.True(t, colBStats1.IsFullLoad()) + idxBStats1 := statsTbl2.GetIdx(idxBID) + require.True(t, idxBStats1.IsFullLoad()) + require.True(t, colCStats.IsAllEvicted()) + + // sync stats load + tk.MustExec("set @@tidb_stats_load_sync_wait = 60000") + tk.MustExec("explain select * from t where c > 1") + statsTbl3 := h.GetTableStats(tblInfo) + colCStats1 := statsTbl3.GetCol(colCID) + require.True(t, colCStats1.IsFullLoad()) + idxCStats1 := statsTbl3.GetIdx(idxCID) + require.True(t, idxCStats1.IsFullLoad()) + + // update stats + tk.MustExec("analyze table t with 1 topn, 3 buckets") + statsTbl4 := h.GetTableStats(tblInfo) + colBStats2 := statsTbl4.GetCol(colBID) + require.True(t, colBStats2.IsFullLoad()) + require.Greater(t, colBStats2.LastUpdateVersion, colBStats1.LastUpdateVersion) + idxBStats2 := statsTbl4.GetIdx(idxBID) + require.True(t, idxBStats2.IsFullLoad()) + require.Greater(t, idxBStats2.LastUpdateVersion, idxBStats1.LastUpdateVersion) + colCStats2 := statsTbl4.GetCol(colCID) + require.True(t, colCStats2.IsFullLoad()) + require.Greater(t, colCStats2.LastUpdateVersion, colCStats1.LastUpdateVersion) + idxCStats2 := statsTbl4.GetIdx(idxCID) + require.True(t, idxCStats2.IsFullLoad()) + require.Greater(t, idxCStats2.LastUpdateVersion, idxCStats1.LastUpdateVersion) +} + +func TestSkipMissingPartitionStats(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("set @@tidb_skip_missing_partition_stats = 1") + tk.MustExec("create table t (a int, b int, c int, index idx_b(b)) partition by range (a) (partition p0 values less than (100), partition p1 values less than (200), partition p2 values less than (300))") + tk.MustExec("insert into t values (1,1,1), (2,2,2), (101,101,101), (102,102,102), (201,201,201), (202,202,202)") + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a", "b", "c") + h := dom.StatsHandle() + require.NoError(t, h.DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t partition p0, p1") + tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + globalStats := h.GetTableStats(tblInfo) + require.Equal(t, 6, int(globalStats.RealtimeCount)) + require.Equal(t, 2, int(globalStats.ModifyCount)) + globalStats.ForEachColumnImmutable(func(_ int64, col *statistics.Column) bool { + require.True(t, col.IsStatsInitialized()) + return false + }) + globalStats.ForEachIndexImmutable(func(_ int64, idx *statistics.Index) bool { + require.True(t, idx.IsStatsInitialized()) + return false + }) +} + +func TestStatsCacheUpdateTimeout(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") + tk.MustExec("set @@tidb_skip_missing_partition_stats = 1") + tk.MustExec("create table t (a int, b int, c int, index idx_b(b)) partition by range (a) (partition p0 values less than (100), partition p1 values less than (200), partition p2 values less than (300))") + tk.MustExec("insert into t values (1,1,1), (2,2,2), (101,101,101), (102,102,102), (201,201,201), (202,202,202)") + analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a", "b", "c") + h := dom.StatsHandle() + require.NoError(t, h.DumpStatsDeltaToKV(true)) + tk.MustExec("analyze table t partition p0, p1") + tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tblInfo := tbl.Meta() + globalStats := h.GetTableStats(tblInfo) + require.Equal(t, 6, int(globalStats.RealtimeCount)) + require.Equal(t, 2, int(globalStats.ModifyCount)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/util/ExecRowsTimeout", "return()")) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/util/ExecRowsTimeout")) + }() + require.Error(t, h.Update(context.Background(), dom.InfoSchema())) + globalStats2 := h.GetTableStats(tblInfo) + require.Equal(t, 6, int(globalStats2.RealtimeCount)) + require.Equal(t, 2, int(globalStats2.ModifyCount)) +} diff --git a/pkg/statistics/handle/storage/read.go b/pkg/statistics/handle/storage/read.go new file mode 100644 index 0000000000000..7f42d144250a7 --- /dev/null +++ b/pkg/statistics/handle/storage/read.go @@ -0,0 +1,823 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "encoding/json" + "strconv" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/asyncload" + statslogutil "github.com/pingcap/tidb/pkg/statistics/handle/logutil" + statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" + "github.com/pingcap/tidb/pkg/statistics/handle/util" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/pingcap/tidb/pkg/util/memory" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "go.uber.org/zap" +) + +// StatsMetaCountAndModifyCount reads count and modify_count for the given table from mysql.stats_meta. +func StatsMetaCountAndModifyCount( + ctx context.Context, + sctx sessionctx.Context, + tableID int64, +) (count, modifyCount int64, isNull bool, err error) { + return statsMetaCountAndModifyCount(ctx, sctx, tableID, false) +} + +// StatsMetaCountAndModifyCountForUpdate reads count and modify_count for the given table from mysql.stats_meta with lock. +func StatsMetaCountAndModifyCountForUpdate( + ctx context.Context, + sctx sessionctx.Context, + tableID int64, +) (count, modifyCount int64, isNull bool, err error) { + return statsMetaCountAndModifyCount(ctx, sctx, tableID, true) +} + +func statsMetaCountAndModifyCount( + ctx context.Context, + sctx sessionctx.Context, + tableID int64, + forUpdate bool, +) (count, modifyCount int64, isNull bool, err error) { + sql := "select count, modify_count from mysql.stats_meta where table_id = %?" + if forUpdate { + sql += " for update" + } + rows, _, err := util.ExecRowsWithCtx(ctx, sctx, sql, tableID) + if err != nil { + return 0, 0, false, err + } + if len(rows) == 0 { + return 0, 0, true, nil + } + count = int64(rows[0].GetUint64(0)) + modifyCount = rows[0].GetInt64(1) + return count, modifyCount, false, nil +} + +// HistMetaFromStorageWithHighPriority reads the meta info of the histogram from the storage. +func HistMetaFromStorageWithHighPriority(sctx sessionctx.Context, item *model.TableItemID, possibleColInfo *model.ColumnInfo) (*statistics.Histogram, *types.Datum, int64, int64, error) { + isIndex := 0 + var tp *types.FieldType + if item.IsIndex { + isIndex = 1 + tp = types.NewFieldType(mysql.TypeBlob) + } else { + tp = &possibleColInfo.FieldType + } + rows, _, err := util.ExecRows(sctx, + "select high_priority distinct_count, version, null_count, tot_col_size, stats_ver, correlation, flag, last_analyze_pos from mysql.stats_histograms where table_id = %? and hist_id = %? and is_index = %?", + item.TableID, + item.ID, + isIndex, + ) + if err != nil { + return nil, nil, 0, 0, err + } + if len(rows) == 0 { + return nil, nil, 0, 0, nil + } + hist := statistics.NewHistogram(item.ID, rows[0].GetInt64(0), rows[0].GetInt64(2), rows[0].GetUint64(1), tp, chunk.InitialCapacity, rows[0].GetInt64(3)) + hist.Correlation = rows[0].GetFloat64(5) + lastPos := rows[0].GetDatum(7, types.NewFieldType(mysql.TypeBlob)) + return hist, &lastPos, rows[0].GetInt64(4), rows[0].GetInt64(6), nil +} + +// HistogramFromStorageWithPriority wraps the HistogramFromStorage with the given kv.Priority. +// Sync load and async load will use high priority to get data. +func HistogramFromStorageWithPriority( + sctx sessionctx.Context, + tableID int64, + colID int64, + tp *types.FieldType, + distinct int64, + isIndex int, + ver uint64, + nullCount int64, + totColSize int64, + corr float64, + priority int, +) (*statistics.Histogram, error) { + selectPrefix := "select " + switch priority { + case kv.PriorityHigh: + selectPrefix += "high_priority " + case kv.PriorityLow: + selectPrefix += "low_priority " + } + rows, fields, err := util.ExecRows(sctx, selectPrefix+"count, repeats, lower_bound, upper_bound, ndv from mysql.stats_buckets where table_id = %? and is_index = %? and hist_id = %? order by bucket_id", tableID, isIndex, colID) + if err != nil { + return nil, errors.Trace(err) + } + bucketSize := len(rows) + hg := statistics.NewHistogram(colID, distinct, nullCount, ver, tp, bucketSize, totColSize) + hg.Correlation = corr + totalCount := int64(0) + for i := 0; i < bucketSize; i++ { + count := rows[i].GetInt64(0) + repeats := rows[i].GetInt64(1) + var upperBound, lowerBound types.Datum + if isIndex == 1 { + lowerBound = rows[i].GetDatum(2, &fields[2].Column.FieldType) + upperBound = rows[i].GetDatum(3, &fields[3].Column.FieldType) + } else { + d := rows[i].GetDatum(2, &fields[2].Column.FieldType) + // For new collation data, when storing the bounds of the histogram, we store the collate key instead of the + // original value. + // But there's additional conversion logic for new collation data, and the collate key might be longer than + // the FieldType.flen. + // If we use the original FieldType here, there might be errors like "Invalid utf8mb4 character string" + // or "Data too long". + // So we change it to TypeBlob to bypass those logics here. + if tp.EvalType() == types.ETString && tp.GetType() != mysql.TypeEnum && tp.GetType() != mysql.TypeSet { + tp = types.NewFieldType(mysql.TypeBlob) + } + lowerBound, err = d.ConvertTo(statistics.UTCWithAllowInvalidDateCtx, tp) + if err != nil { + return nil, errors.Trace(err) + } + d = rows[i].GetDatum(3, &fields[3].Column.FieldType) + upperBound, err = d.ConvertTo(statistics.UTCWithAllowInvalidDateCtx, tp) + if err != nil { + return nil, errors.Trace(err) + } + } + totalCount += count + hg.AppendBucketWithNDV(&lowerBound, &upperBound, totalCount, repeats, rows[i].GetInt64(4)) + } + hg.PreCalculateScalar() + return hg, nil +} + +// CMSketchAndTopNFromStorageWithHighPriority reads CMSketch and TopN from storage. +func CMSketchAndTopNFromStorageWithHighPriority(sctx sessionctx.Context, tblID int64, isIndex, histID, statsVer int64) (_ *statistics.CMSketch, _ *statistics.TopN, err error) { + topNRows, _, err := util.ExecRows(sctx, "select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil { + return nil, nil, err + } + // If we are on version higher than 1. Don't read Count-Min Sketch. + if statsVer > statistics.Version1 { + return statistics.DecodeCMSketchAndTopN(nil, topNRows) + } + rows, _, err := util.ExecRows(sctx, "select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil { + return nil, nil, err + } + if len(rows) == 0 { + return statistics.DecodeCMSketchAndTopN(nil, topNRows) + } + return statistics.DecodeCMSketchAndTopN(rows[0].GetBytes(0), topNRows) +} + +// CMSketchFromStorage reads CMSketch from storage +func CMSketchFromStorage(sctx sessionctx.Context, tblID int64, isIndex int, histID int64) (_ *statistics.CMSketch, err error) { + rows, _, err := util.ExecRows(sctx, "select cm_sketch from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil || len(rows) == 0 { + return nil, err + } + return statistics.DecodeCMSketch(rows[0].GetBytes(0)) +} + +// TopNFromStorage reads TopN from storage +func TopNFromStorage(sctx sessionctx.Context, tblID int64, isIndex int, histID int64) (_ *statistics.TopN, err error) { + rows, _, err := util.ExecRows(sctx, "select HIGH_PRIORITY value, count from mysql.stats_top_n where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil || len(rows) == 0 { + return nil, err + } + return statistics.DecodeTopN(rows), nil +} + +// FMSketchFromStorage reads FMSketch from storage +func FMSketchFromStorage(sctx sessionctx.Context, tblID int64, isIndex, histID int64) (_ *statistics.FMSketch, err error) { + rows, _, err := util.ExecRows(sctx, "select value from mysql.stats_fm_sketch where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histID) + if err != nil || len(rows) == 0 { + return nil, err + } + return statistics.DecodeFMSketch(rows[0].GetBytes(0)) +} + +// CheckSkipPartition checks if we can skip loading the partition. +func CheckSkipPartition(sctx sessionctx.Context, tblID int64, isIndex int) error { + rows, _, err := util.ExecRows(sctx, "select distinct_count from mysql.stats_histograms where table_id =%? and is_index = %?", tblID, isIndex) + if err != nil { + return err + } + if len(rows) == 0 { + return types.ErrPartitionStatsMissing + } + return nil +} + +// CheckSkipColumnPartiion checks if we can skip loading the partition. +func CheckSkipColumnPartiion(sctx sessionctx.Context, tblID int64, isIndex int, histsID int64) error { + rows, _, err := util.ExecRows(sctx, "select distinct_count from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tblID, isIndex, histsID) + if err != nil { + return err + } + if len(rows) == 0 { + return types.ErrPartitionColumnStatsMissing + } + return nil +} + +// ExtendedStatsFromStorage reads extended stats from storage. +func ExtendedStatsFromStorage(sctx sessionctx.Context, table *statistics.Table, tableID int64, loadAll bool) (*statistics.Table, error) { + failpoint.Inject("injectExtStatsLoadErr", func() { + failpoint.Return(nil, errors.New("gofail extendedStatsFromStorage error")) + }) + lastVersion := uint64(0) + if table.ExtendedStats != nil && !loadAll { + lastVersion = table.ExtendedStats.LastUpdateVersion + } else { + table.ExtendedStats = statistics.NewExtendedStatsColl() + } + rows, _, err := util.ExecRows(sctx, "select name, status, type, column_ids, stats, version from mysql.stats_extended where table_id = %? and status in (%?, %?, %?) and version > %?", + tableID, statistics.ExtendedStatsInited, statistics.ExtendedStatsAnalyzed, statistics.ExtendedStatsDeleted, lastVersion) + if err != nil || len(rows) == 0 { + return table, nil + } + for _, row := range rows { + lastVersion = max(lastVersion, row.GetUint64(5)) + name := row.GetString(0) + status := uint8(row.GetInt64(1)) + if status == statistics.ExtendedStatsDeleted || status == statistics.ExtendedStatsInited { + delete(table.ExtendedStats.Stats, name) + } else { + item := &statistics.ExtendedStatsItem{ + Tp: uint8(row.GetInt64(2)), + } + colIDs := row.GetString(3) + err := json.Unmarshal([]byte(colIDs), &item.ColIDs) + if err != nil { + statslogutil.StatsLogger().Error("decode column IDs failed", zap.String("column_ids", colIDs), zap.Error(err)) + return nil, err + } + statsStr := row.GetString(4) + if item.Tp == ast.StatsTypeCardinality || item.Tp == ast.StatsTypeCorrelation { + if statsStr != "" { + item.ScalarVals, err = strconv.ParseFloat(statsStr, 64) + if err != nil { + statslogutil.StatsLogger().Error("parse scalar stats failed", zap.String("stats", statsStr), zap.Error(err)) + return nil, err + } + } + } else { + item.StringVals = statsStr + } + table.ExtendedStats.Stats[name] = item + } + } + table.ExtendedStats.LastUpdateVersion = lastVersion + return table, nil +} + +func indexStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool, lease time.Duration, tracker *memory.Tracker) error { + histID := row.GetInt64(2) + distinct := row.GetInt64(3) + histVer := row.GetUint64(4) + nullCount := row.GetInt64(5) + statsVer := row.GetInt64(7) + idx := table.GetIdx(histID) + flag := row.GetInt64(8) + lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) + + for _, idxInfo := range tableInfo.Indices { + if histID != idxInfo.ID { + continue + } + table.ColAndIdxExistenceMap.InsertIndex(idxInfo.ID, statsVer != statistics.Version0) + // All the objects in the table shares the same stats version. + // Update here. + if statsVer != statistics.Version0 { + table.StatsVer = int(statsVer) + table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, histVer) + } + // We will not load buckets, topn and cmsketch if: + // 1. lease > 0, and: + // 2. the index doesn't have any of buckets, topn, cmsketch in memory before, and: + // 3. loadAll is false. + // 4. lite-init-stats is true(remove the condition when lite init stats is GA). + notNeedLoad := lease > 0 && + (idx == nil || ((!idx.IsStatsInitialized() || idx.IsAllEvicted()) && idx.LastUpdateVersion < histVer)) && + !loadAll && + config.GetGlobalConfig().Performance.LiteInitStats + if notNeedLoad { + // If we don't have this index in memory, skip it. + if idx == nil { + return nil + } + idx = &statistics.Index{ + Histogram: *statistics.NewHistogram(histID, distinct, nullCount, histVer, types.NewFieldType(mysql.TypeBlob), 0, 0), + StatsVer: statsVer, + Info: idxInfo, + Flag: flag, + PhysicalID: table.PhysicalID, + } + if idx.IsAnalyzed() { + idx.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + lastAnalyzePos.Copy(&idx.LastAnalyzePos) + break + } + if idx == nil || idx.LastUpdateVersion < histVer || loadAll { + hg, err := HistogramFromStorageWithPriority(sctx, table.PhysicalID, histID, types.NewFieldType(mysql.TypeBlob), distinct, 1, histVer, nullCount, 0, 0, kv.PriorityNormal) + if err != nil { + return errors.Trace(err) + } + cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, table.PhysicalID, 1, idxInfo.ID, statsVer) + if err != nil { + return errors.Trace(err) + } + var fmSketch *statistics.FMSketch + if loadAll { + // FMSketch is only used when merging partition stats into global stats. When merging partition stats into global stats, + // we load all the statistics, i.e., loadAll is true. + fmSketch, err = FMSketchFromStorage(sctx, table.PhysicalID, 1, histID) + if err != nil { + return errors.Trace(err) + } + } + idx = &statistics.Index{ + Histogram: *hg, + CMSketch: cms, + TopN: topN, + FMSketch: fmSketch, + Info: idxInfo, + StatsVer: statsVer, + Flag: flag, + PhysicalID: table.PhysicalID, + } + if statsVer != statistics.Version0 { + idx.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } + lastAnalyzePos.Copy(&idx.LastAnalyzePos) + } + break + } + if idx != nil { + if tracker != nil { + tracker.Consume(idx.MemoryUsage().TotalMemoryUsage()) + } + table.SetIdx(histID, idx) + } else { + logutil.BgLogger().Debug("we cannot find index id in table info. It may be deleted.", zap.Int64("indexID", histID), zap.String("table", tableInfo.Name.O)) + } + return nil +} + +func columnStatsFromStorage(sctx sessionctx.Context, row chunk.Row, table *statistics.Table, tableInfo *model.TableInfo, loadAll bool, lease time.Duration, tracker *memory.Tracker) error { + histID := row.GetInt64(2) + distinct := row.GetInt64(3) + histVer := row.GetUint64(4) + nullCount := row.GetInt64(5) + totColSize := row.GetInt64(6) + statsVer := row.GetInt64(7) + correlation := row.GetFloat64(9) + lastAnalyzePos := row.GetDatum(10, types.NewFieldType(mysql.TypeBlob)) + col := table.GetCol(histID) + flag := row.GetInt64(8) + + for _, colInfo := range tableInfo.Columns { + if histID != colInfo.ID { + continue + } + table.ColAndIdxExistenceMap.InsertCol(histID, statsVer != statistics.Version0 || distinct > 0 || nullCount > 0) + // All the objects in the table shares the same stats version. + // Update here. + if statsVer != statistics.Version0 { + table.StatsVer = int(statsVer) + table.LastAnalyzeVersion = max(table.LastAnalyzeVersion, histVer) + } + isHandle := tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()) + // We will not load buckets, topn and cmsketch if: + // 1. lease > 0, and: + // 2. this column is not handle or lite-init-stats is true(remove the condition when lite init stats is GA), and: + // 3. the column doesn't have any of buckets, topn, cmsketch in memory before, and: + // 4. loadAll is false. + // + // Here is the explanation of the condition `!col.IsStatsInitialized() || col.IsAllEvicted()`. + // For one column: + // 1. If there is no stats for it in the storage(i.e., analyze has never been executed before), then its stats status + // would be `!col.IsStatsInitialized()`. In this case we should go the `notNeedLoad` path. + // 2. If there exists stats for it in the storage but its stats status is `col.IsAllEvicted()`, there are two + // sub cases for this case. One is that the column stats have never been used/needed by the optimizer so they have + // never been loaded. The other is that the column stats were loaded and then evicted. For the both sub cases, + // we should go the `notNeedLoad` path. + // 3. If some parts(Histogram/TopN/CMSketch) of stats for it exist in TiDB memory currently, we choose to load all of + // its new stats once we find stats version is updated. + notNeedLoad := lease > 0 && + (!isHandle || config.GetGlobalConfig().Performance.LiteInitStats) && + (col == nil || ((!col.IsStatsInitialized() || col.IsAllEvicted()) && col.LastUpdateVersion < histVer)) && + !loadAll + if notNeedLoad { + // If we don't have the column in memory currently, just skip it. + if col == nil { + return nil + } + col = &statistics.Column{ + PhysicalID: table.PhysicalID, + Histogram: *statistics.NewHistogram(histID, distinct, nullCount, histVer, &colInfo.FieldType, 0, totColSize), + Info: colInfo, + IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + Flag: flag, + StatsVer: statsVer, + } + if col.StatsAvailable() { + col.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + lastAnalyzePos.Copy(&col.LastAnalyzePos) + col.Histogram.Correlation = correlation + break + } + if col == nil || col.LastUpdateVersion < histVer || loadAll { + hg, err := HistogramFromStorageWithPriority(sctx, table.PhysicalID, histID, &colInfo.FieldType, distinct, 0, histVer, nullCount, totColSize, correlation, kv.PriorityNormal) + if err != nil { + return errors.Trace(err) + } + cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, table.PhysicalID, 0, colInfo.ID, statsVer) + if err != nil { + return errors.Trace(err) + } + var fmSketch *statistics.FMSketch + if loadAll { + // FMSketch is only used when merging partition stats into global stats. When merging partition stats into global stats, + // we load all the statistics, i.e., loadAll is true. + fmSketch, err = FMSketchFromStorage(sctx, table.PhysicalID, 0, histID) + if err != nil { + return errors.Trace(err) + } + } + col = &statistics.Column{ + PhysicalID: table.PhysicalID, + Histogram: *hg, + Info: colInfo, + CMSketch: cms, + TopN: topN, + FMSketch: fmSketch, + IsHandle: tableInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + Flag: flag, + StatsVer: statsVer, + } + if col.StatsAvailable() { + col.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } + lastAnalyzePos.Copy(&col.LastAnalyzePos) + break + } + if col.TotColSize != totColSize { + newCol := *col + newCol.TotColSize = totColSize + col = &newCol + } + break + } + if col != nil { + if tracker != nil { + tracker.Consume(col.MemoryUsage().TotalMemoryUsage()) + } + table.SetCol(col.ID, col) + } else { + // If we didn't find a Column or Index in tableInfo, we won't load the histogram for it. + // But don't worry, next lease the ddl will be updated, and we will load a same table for two times to + // avoid error. + logutil.BgLogger().Debug("we cannot find column in table info now. It may be deleted", zap.Int64("colID", histID), zap.String("table", tableInfo.Name.O)) + } + return nil +} + +// TableStatsFromStorage loads table stats info from storage. +func TableStatsFromStorage(sctx sessionctx.Context, snapshot uint64, tableInfo *model.TableInfo, tableID int64, loadAll bool, lease time.Duration, table *statistics.Table) (_ *statistics.Table, err error) { + tracker := memory.NewTracker(memory.LabelForAnalyzeMemory, -1) + tracker.AttachTo(sctx.GetSessionVars().MemTracker) + defer tracker.Detach() + // If table stats is pseudo, we also need to copy it, since we will use the column stats when + // the average error rate of it is small. + if table == nil || snapshot > 0 { + histColl := *statistics.NewHistColl(tableID, true, 0, 0, 4, 4) + table = &statistics.Table{ + HistColl: histColl, + ColAndIdxExistenceMap: statistics.NewColAndIndexExistenceMap(len(tableInfo.Columns), len(tableInfo.Indices)), + } + } else { + // We copy it before writing to avoid race. + table = table.Copy() + } + table.Pseudo = false + + realtimeCount, modidyCount, isNull, err := StatsMetaCountAndModifyCount(util.StatsCtx, sctx, tableID) + if err != nil || isNull { + return nil, err + } + table.ModifyCount = modidyCount + table.RealtimeCount = realtimeCount + + rows, _, err := util.ExecRows(sctx, "select table_id, is_index, hist_id, distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %?", tableID) + if err != nil { + return nil, err + } + // Check deleted table. + if len(rows) == 0 { + return nil, nil + } + for _, row := range rows { + if err := sctx.GetSessionVars().SQLKiller.HandleSignal(); err != nil { + return nil, err + } + if row.GetInt64(1) > 0 { + err = indexStatsFromStorage(sctx, row, table, tableInfo, loadAll, lease, tracker) + } else { + err = columnStatsFromStorage(sctx, row, table, tableInfo, loadAll, lease, tracker) + } + if err != nil { + return nil, err + } + } + table.ColAndIdxExistenceMap.SetChecked() + return ExtendedStatsFromStorage(sctx, table, tableID, loadAll) +} + +// LoadHistogram will load histogram from storage. +func LoadHistogram(sctx sessionctx.Context, tableID int64, isIndex int, histID int64, tableInfo *model.TableInfo) (*statistics.Histogram, error) { + row, _, err := util.ExecRows(sctx, "select distinct_count, version, null_count, tot_col_size, stats_ver, flag, correlation, last_analyze_pos from mysql.stats_histograms where table_id = %? and is_index = %? and hist_id = %?", tableID, isIndex, histID) + if err != nil || len(row) == 0 { + return nil, err + } + distinct := row[0].GetInt64(0) + histVer := row[0].GetUint64(1) + nullCount := row[0].GetInt64(2) + var totColSize int64 + var corr float64 + var tp types.FieldType + if isIndex == 0 { + totColSize = row[0].GetInt64(3) + corr = row[0].GetFloat64(6) + for _, colInfo := range tableInfo.Columns { + if histID != colInfo.ID { + continue + } + tp = colInfo.FieldType + break + } + return HistogramFromStorageWithPriority(sctx, tableID, histID, &tp, distinct, isIndex, histVer, nullCount, totColSize, corr, kv.PriorityNormal) + } + return HistogramFromStorageWithPriority(sctx, tableID, histID, types.NewFieldType(mysql.TypeBlob), distinct, isIndex, histVer, nullCount, 0, 0, kv.PriorityNormal) +} + +// LoadNeededHistograms will load histograms for those needed columns/indices. +func LoadNeededHistograms(sctx sessionctx.Context, is infoschema.InfoSchema, statsHandle statstypes.StatsHandle, loadFMSketch bool) (err error) { + items := asyncload.AsyncLoadHistogramNeededItems.AllItems() + for _, item := range items { + if !item.IsIndex { + err = loadNeededColumnHistograms(sctx, statsHandle, item.TableItemID, loadFMSketch, item.FullLoad) + } else { + // Index is always full load. + err = loadNeededIndexHistograms(sctx, is, statsHandle, item.TableItemID, loadFMSketch) + } + if err != nil { + return err + } + } + return nil +} + +// CleanFakeItemsForShowHistInFlights cleans the invalid inserted items. +func CleanFakeItemsForShowHistInFlights(statsCache statstypes.StatsCache) int { + items := asyncload.AsyncLoadHistogramNeededItems.AllItems() + reallyNeeded := 0 + for _, item := range items { + tbl, ok := statsCache.Get(item.TableID) + if !ok { + asyncload.AsyncLoadHistogramNeededItems.Delete(item.TableItemID) + continue + } + loadNeeded := false + if item.IsIndex { + _, loadNeeded = tbl.IndexIsLoadNeeded(item.ID) + } else { + var analyzed bool + _, loadNeeded, analyzed = tbl.ColumnIsLoadNeeded(item.ID, item.FullLoad) + loadNeeded = loadNeeded && analyzed + } + if !loadNeeded { + asyncload.AsyncLoadHistogramNeededItems.Delete(item.TableItemID) + continue + } + reallyNeeded++ + } + return reallyNeeded +} + +func loadNeededColumnHistograms(sctx sessionctx.Context, statsHandle statstypes.StatsHandle, col model.TableItemID, loadFMSketch bool, fullLoad bool) (err error) { + statsTbl, ok := statsHandle.Get(col.TableID) + if !ok { + return nil + } + // Now, we cannot init the column info in the ColAndIdxExistenceMap when to disable lite-init-stats. + // so we have to get the column info from the domain. + is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) + tbl, ok := statsHandle.TableInfoByID(is, col.TableID) + if !ok { + return nil + } + tblInfo := tbl.Meta() + colInfo := tblInfo.GetColumnByID(col.ID) + if colInfo == nil { + asyncload.AsyncLoadHistogramNeededItems.Delete(col) + return nil + } + + _, loadNeeded, analyzed := statsTbl.ColumnIsLoadNeeded(col.ID, true) + if !loadNeeded || !analyzed { + // If this column is not analyzed yet and we don't have it in memory. + // We create a fake one for the pseudo estimation. + // Otherwise, it will trigger the sync/async load again, even if the column has not been analyzed. + if loadNeeded && !analyzed { + fakeCol := statistics.EmptyColumn(tblInfo.ID, tblInfo.PKIsHandle, colInfo) + statsTbl.SetCol(col.ID, fakeCol) + statsHandle.UpdateStatsCache(statstypes.CacheUpdate{ + Updated: []*statistics.Table{statsTbl}, + }) + } + asyncload.AsyncLoadHistogramNeededItems.Delete(col) + return nil + } + + hg, _, statsVer, _, err := HistMetaFromStorageWithHighPriority(sctx, &col, colInfo) + if hg == nil || err != nil { + asyncload.AsyncLoadHistogramNeededItems.Delete(col) + return err + } + var ( + cms *statistics.CMSketch + topN *statistics.TopN + fms *statistics.FMSketch + ) + if fullLoad { + hg, err = HistogramFromStorageWithPriority(sctx, col.TableID, col.ID, &colInfo.FieldType, hg.NDV, 0, hg.LastUpdateVersion, hg.NullCount, hg.TotColSize, hg.Correlation, kv.PriorityHigh) + if err != nil { + return errors.Trace(err) + } + cms, topN, err = CMSketchAndTopNFromStorageWithHighPriority(sctx, col.TableID, 0, col.ID, statsVer) + if err != nil { + return errors.Trace(err) + } + if loadFMSketch { + fms, err = FMSketchFromStorage(sctx, col.TableID, 0, col.ID) + if err != nil { + return errors.Trace(err) + } + } + } + + colHist := &statistics.Column{ + PhysicalID: col.TableID, + Histogram: *hg, + Info: colInfo, + CMSketch: cms, + TopN: topN, + FMSketch: fms, + IsHandle: tblInfo.PKIsHandle && mysql.HasPriKeyFlag(colInfo.GetFlag()), + StatsVer: statsVer, + } + // Reload the latest stats cache, otherwise the `updateStatsCache` may fail with high probability, because functions + // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. + statsTbl, ok = statsHandle.Get(col.TableID) + if !ok { + return nil + } + statsTbl = statsTbl.Copy() + if colHist.StatsAvailable() { + if fullLoad { + colHist.StatsLoadedStatus = statistics.NewStatsFullLoadStatus() + } else { + colHist.StatsLoadedStatus = statistics.NewStatsAllEvictedStatus() + } + if statsVer != statistics.Version0 { + statsTbl.LastAnalyzeVersion = max(statsTbl.LastAnalyzeVersion, colHist.LastUpdateVersion) + statsTbl.StatsVer = int(statsVer) + } + } + statsTbl.SetCol(col.ID, colHist) + statsHandle.UpdateStatsCache(statstypes.CacheUpdate{ + Updated: []*statistics.Table{statsTbl}, + }) + asyncload.AsyncLoadHistogramNeededItems.Delete(col) + if col.IsSyncLoadFailed { + logutil.BgLogger().Warn("Hist for column should already be loaded as sync but not found.", + zap.Int64("table_id", colHist.PhysicalID), + zap.Int64("column_id", colHist.Info.ID), + zap.String("column_name", colHist.Info.Name.O)) + } + return nil +} + +func loadNeededIndexHistograms(sctx sessionctx.Context, is infoschema.InfoSchema, statsHandle statstypes.StatsHandle, idx model.TableItemID, loadFMSketch bool) (err error) { + tbl, ok := statsHandle.Get(idx.TableID) + if !ok { + return nil + } + _, loadNeeded := tbl.IndexIsLoadNeeded(idx.ID) + if !loadNeeded { + asyncload.AsyncLoadHistogramNeededItems.Delete(idx) + return nil + } + hgMeta, lastAnalyzePos, statsVer, flag, err := HistMetaFromStorageWithHighPriority(sctx, &idx, nil) + if hgMeta == nil || err != nil { + asyncload.AsyncLoadHistogramNeededItems.Delete(idx) + return err + } + tblInfo, ok := statsHandle.TableInfoByID(is, idx.TableID) + if !ok { + return nil + } + idxInfo := tblInfo.Meta().FindIndexByID(idx.ID) + hg, err := HistogramFromStorageWithPriority(sctx, idx.TableID, idx.ID, types.NewFieldType(mysql.TypeBlob), hgMeta.NDV, 1, hgMeta.LastUpdateVersion, hgMeta.NullCount, hgMeta.TotColSize, hgMeta.Correlation, kv.PriorityHigh) + if err != nil { + return errors.Trace(err) + } + cms, topN, err := CMSketchAndTopNFromStorageWithHighPriority(sctx, idx.TableID, 1, idx.ID, statsVer) + if err != nil { + return errors.Trace(err) + } + var fms *statistics.FMSketch + if loadFMSketch { + fms, err = FMSketchFromStorage(sctx, idx.TableID, 1, idx.ID) + if err != nil { + return errors.Trace(err) + } + } + idxHist := &statistics.Index{Histogram: *hg, CMSketch: cms, TopN: topN, FMSketch: fms, + Info: idxInfo, StatsVer: statsVer, + Flag: flag, PhysicalID: idx.TableID, + StatsLoadedStatus: statistics.NewStatsFullLoadStatus()} + lastAnalyzePos.Copy(&idxHist.LastAnalyzePos) + + tbl, ok = statsHandle.Get(idx.TableID) + if !ok { + return nil + } + tbl = tbl.Copy() + if idxHist.StatsVer != statistics.Version0 { + tbl.StatsVer = int(idxHist.StatsVer) + tbl.LastAnalyzeVersion = max(tbl.LastAnalyzeVersion, idxHist.LastUpdateVersion) + } + tbl.SetIdx(idx.ID, idxHist) + statsHandle.UpdateStatsCache(statstypes.CacheUpdate{ + Updated: []*statistics.Table{tbl}, + }) + if idx.IsSyncLoadFailed { + logutil.BgLogger().Warn("Hist for index should already be loaded as sync but not found.", + zap.Int64("table_id", idx.TableID), + zap.Int64("index_id", idxHist.Info.ID), + zap.String("index_name", idxHist.Info.Name.O)) + } + asyncload.AsyncLoadHistogramNeededItems.Delete(idx) + return nil +} + +// StatsMetaByTableIDFromStorage gets the stats meta of a table from storage. +func StatsMetaByTableIDFromStorage(sctx sessionctx.Context, tableID int64, snapshot uint64) (version uint64, modifyCount, count int64, err error) { + var rows []chunk.Row + if snapshot == 0 { + rows, _, err = util.ExecRows(sctx, + "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID) + } else { + rows, _, err = util.ExecWithOpts(sctx, + []sqlexec.OptionFuncAlias{sqlexec.ExecOptionWithSnapshot(snapshot), sqlexec.ExecOptionUseCurSession}, + "SELECT version, modify_count, count from mysql.stats_meta where table_id = %? order by version", tableID) + } + if err != nil || len(rows) == 0 { + return + } + version = rows[0].GetUint64(0) + modifyCount = rows[0].GetInt64(1) + count = rows[0].GetInt64(2) + return +} diff --git a/pkg/statistics/handle/util/BUILD.bazel b/pkg/statistics/handle/util/BUILD.bazel new file mode 100644 index 0000000000000..a40c702ffb6b6 --- /dev/null +++ b/pkg/statistics/handle/util/BUILD.bazel @@ -0,0 +1,49 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "util", + srcs = [ + "auto_analyze_proc_id_generator.go", + "lease_getter.go", + "pool.go", + "table_info.go", + "util.go", + ], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/util", + visibility = ["//visibility:public"], + deps = [ + "//pkg/infoschema", + "//pkg/kv", + "//pkg/meta/model", + "//pkg/parser/terror", + "//pkg/planner/core/resolve", + "//pkg/sessionctx", + "//pkg/sessionctx/sysproctrack", + "//pkg/sessionctx/variable", + "//pkg/table", + "//pkg/types", + "//pkg/util", + "//pkg/util/chunk", + "//pkg/util/intest", + "//pkg/util/sqlexec", + "//pkg/util/sqlexec/mock", + "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", + "@com_github_tiancaiamao_gp//:gp", + "@com_github_tikv_client_go_v2//oracle", + "@org_golang_x_exp//maps", + "@org_uber_go_atomic//:atomic", + ], +) + +go_test( + name = "util_test", + timeout = "short", + srcs = ["util_test.go"], + flaky = True, + deps = [ + ":util", + "//pkg/testkit", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/statistics/handle/util/util.go b/pkg/statistics/handle/util/util.go new file mode 100644 index 0000000000000..e01ed16ca52a1 --- /dev/null +++ b/pkg/statistics/handle/util/util.go @@ -0,0 +1,283 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "context" + "strconv" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/parser/terror" + "github.com/pingcap/tidb/pkg/planner/core/resolve" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/chunk" + "github.com/pingcap/tidb/pkg/util/intest" + "github.com/pingcap/tidb/pkg/util/sqlexec" + "github.com/pingcap/tidb/pkg/util/sqlexec/mock" + "github.com/tikv/client-go/v2/oracle" +) + +const ( + // StatsMetaHistorySourceAnalyze indicates stats history meta source from analyze + StatsMetaHistorySourceAnalyze = "analyze" + // StatsMetaHistorySourceLoadStats indicates stats history meta source from load stats + StatsMetaHistorySourceLoadStats = "load stats" + // StatsMetaHistorySourceFlushStats indicates stats history meta source from flush stats + StatsMetaHistorySourceFlushStats = "flush stats" + // StatsMetaHistorySourceSchemaChange indicates stats history meta source from schema change + StatsMetaHistorySourceSchemaChange = "schema change" + // StatsMetaHistorySourceExtendedStats indicates stats history meta source from extended stats + StatsMetaHistorySourceExtendedStats = "extended stats" +) + +var ( + // UseCurrentSessionOpt to make sure the sql is executed in current session. + UseCurrentSessionOpt = []sqlexec.OptionFuncAlias{sqlexec.ExecOptionUseCurSession} + + // StatsCtx is used to mark the request is from stats module. + StatsCtx = kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) +) + +// finishTransaction will execute `commit` when error is nil, otherwise `rollback`. +func finishTransaction(sctx sessionctx.Context, err error) error { + if err == nil { + _, _, err = ExecRows(sctx, "COMMIT") + } else { + _, _, err1 := ExecRows(sctx, "rollback") + terror.Log(errors.Trace(err1)) + } + return errors.Trace(err) +} + +var ( + // FlagWrapTxn indicates whether to wrap a transaction. + FlagWrapTxn = 0 +) + +// CallWithSCtx allocates a sctx from the pool and call the f(). +func CallWithSCtx(pool util.SessionPool, f func(sctx sessionctx.Context) error, flags ...int) (err error) { + se, err := pool.Get() + if err != nil { + return err + } + defer func() { + if err == nil { // only recycle when no error + pool.Put(se) + } + }() + sctx := se.(sessionctx.Context) + if err := UpdateSCtxVarsForStats(sctx); err != nil { // update stats variables automatically + return err + } + + wrapTxn := false + for _, flag := range flags { + if flag == FlagWrapTxn { + wrapTxn = true + } + } + if wrapTxn { + err = WrapTxn(sctx, f) + } else { + err = f(sctx) + } + return err +} + +// UpdateSCtxVarsForStats updates all necessary variables that may affect the behavior of statistics. +func UpdateSCtxVarsForStats(sctx sessionctx.Context) error { + // async merge global stats + enableAsyncMergeGlobalStats, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAsyncMergeGlobalStats) + if err != nil { + return err + } + sctx.GetSessionVars().EnableAsyncMergeGlobalStats = variable.TiDBOptOn(enableAsyncMergeGlobalStats) + + // concurrency of save stats to storage + analyzePartitionConcurrency, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzePartitionConcurrency) + if err != nil { + return err + } + c, err := strconv.ParseInt(analyzePartitionConcurrency, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzePartitionConcurrency = int(c) + + // analyzer version + verInString, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeVersion) + if err != nil { + return err + } + ver, err := strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeVersion = int(ver) + + // enable historical stats + val, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableHistoricalStats) + if err != nil { + return err + } + sctx.GetSessionVars().EnableHistoricalStats = variable.TiDBOptOn(val) + + // partition mode + pruneMode, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBPartitionPruneMode) + if err != nil { + return err + } + sctx.GetSessionVars().PartitionPruneMode.Store(pruneMode) + + // enable analyze snapshot + analyzeSnapshot, err := sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBEnableAnalyzeSnapshot) + if err != nil { + return err + } + sctx.GetSessionVars().EnableAnalyzeSnapshot = variable.TiDBOptOn(analyzeSnapshot) + + // enable skip column types + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBAnalyzeSkipColumnTypes) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzeSkipColumnTypes = variable.ParseAnalyzeSkipColumnTypes(val) + + // skip missing partition stats + val, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBSkipMissingPartitionStats) + if err != nil { + return err + } + sctx.GetSessionVars().SkipMissingPartitionStats = variable.TiDBOptOn(val) + verInString, err = sctx.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBMergePartitionStatsConcurrency) + if err != nil { + return err + } + ver, err = strconv.ParseInt(verInString, 10, 64) + if err != nil { + return err + } + sctx.GetSessionVars().AnalyzePartitionMergeConcurrency = int(ver) + return nil +} + +// GetCurrentPruneMode returns the current latest partitioning table prune mode. +func GetCurrentPruneMode(pool util.SessionPool) (mode string, err error) { + err = CallWithSCtx(pool, func(sctx sessionctx.Context) error { + mode = sctx.GetSessionVars().PartitionPruneMode.Load() + return nil + }) + return +} + +// WrapTxn uses a transaction here can let different SQLs in this operation have the same data visibility. +func WrapTxn(sctx sessionctx.Context, f func(sctx sessionctx.Context) error) (err error) { + // TODO: check whether this sctx is already in a txn + if _, _, err := ExecRows(sctx, "BEGIN PESSIMISTIC"); err != nil { + return err + } + defer func() { + err = finishTransaction(sctx, err) + }() + err = f(sctx) + return +} + +// GetStartTS gets the start ts from current transaction. +func GetStartTS(sctx sessionctx.Context) (uint64, error) { + txn, err := sctx.Txn(true) + if err != nil { + return 0, err + } + return txn.StartTS(), nil +} + +// Exec is a helper function to execute sql and return RecordSet. +func Exec(sctx sessionctx.Context, sql string, args ...any) (sqlexec.RecordSet, error) { + return ExecWithCtx(StatsCtx, sctx, sql, args...) +} + +// ExecWithCtx is a helper function to execute sql and return RecordSet. +func ExecWithCtx( + ctx context.Context, + sctx sessionctx.Context, + sql string, + args ...any, +) (sqlexec.RecordSet, error) { + sqlExec := sctx.GetSQLExecutor() + // TODO: use RestrictedSQLExecutor + ExecOptionUseCurSession instead of SQLExecutor + return sqlExec.ExecuteInternal(ctx, sql, args...) +} + +// ExecRows is a helper function to execute sql and return rows and fields. +func ExecRows(sctx sessionctx.Context, sql string, args ...any) (rows []chunk.Row, fields []*resolve.ResultField, err error) { + failpoint.Inject("ExecRowsTimeout", func() { + failpoint.Return(nil, nil, errors.New("inject timeout error")) + }) + return ExecRowsWithCtx(StatsCtx, sctx, sql, args...) +} + +// ExecRowsWithCtx is a helper function to execute sql and return rows and fields. +func ExecRowsWithCtx( + ctx context.Context, + sctx sessionctx.Context, + sql string, + args ...any, +) (rows []chunk.Row, fields []*resolve.ResultField, err error) { + if intest.InTest { + if v := sctx.Value(mock.RestrictedSQLExecutorKey{}); v != nil { + return v.(*mock.MockRestrictedSQLExecutor).ExecRestrictedSQL( + StatsCtx, UseCurrentSessionOpt, sql, args..., + ) + } + } + + sqlExec := sctx.GetRestrictedSQLExecutor() + return sqlExec.ExecRestrictedSQL(ctx, UseCurrentSessionOpt, sql, args...) +} + +// ExecWithOpts is a helper function to execute sql and return rows and fields. +func ExecWithOpts(sctx sessionctx.Context, opts []sqlexec.OptionFuncAlias, sql string, args ...any) (rows []chunk.Row, fields []*resolve.ResultField, err error) { + sqlExec := sctx.GetRestrictedSQLExecutor() + return sqlExec.ExecRestrictedSQL(StatsCtx, opts, sql, args...) +} + +// DurationToTS converts duration to timestamp. +func DurationToTS(d time.Duration) uint64 { + return oracle.ComposeTS(d.Nanoseconds()/int64(time.Millisecond), 0) +} + +// IsSpecialGlobalIndex checks a index is a special global index or not. +// A special global index is one that is a global index and has virtual generated columns or prefix columns. +func IsSpecialGlobalIndex(idx *model.IndexInfo, tblInfo *model.TableInfo) bool { + if !idx.Global { + return false + } + for _, col := range idx.Columns { + colInfo := tblInfo.Columns[col.Offset] + isPrefixCol := col.Length != types.UnspecifiedLength + if colInfo.IsVirtualGenerated() || isPrefixCol { + return true + } + } + return false +}