Skip to content

Commit

Permalink
statistics: use DDL subscriber updating stats meta
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <tech@rustin.me>
  • Loading branch information
Rustin170506 committed Dec 11, 2024
1 parent 93295c2 commit 39eebe4
Show file tree
Hide file tree
Showing 55 changed files with 1,037 additions and 881 deletions.
23 changes: 13 additions & 10 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,18 +588,21 @@ func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *mo
return nil
}

ch := jobCtx.oldDDLCtx.ddlEventCh
if ch != nil {
forLoop:
for i := 0; i < 10; i++ {
select {
case ch <- e:
break forLoop
default:
time.Sleep(time.Microsecond * 10)
// Only for test.
if intest.InTest {
ch := jobCtx.oldDDLCtx.ddlEventCh
if ch != nil {
forLoop:
for i := 0; i < 10; i++ {
select {
case ch <- e:
break forLoop
default:
time.Sleep(time.Microsecond * 10)
}
}
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
}
logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e))
}

intest.Assert(jobCtx.eventPublishStore != nil, "eventPublishStore should not be nil")
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_test(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/statistics/handle/util",
"//pkg/store/gcworker",
"//pkg/store/mockstore",
"//pkg/table",
Expand Down
49 changes: 41 additions & 8 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
Expand Down Expand Up @@ -2947,7 +2948,11 @@ func TestRemoveKeyPartitioning(t *testing.T) {
tk.MustExec("create database RemovePartitioning")
tk.MustExec("use RemovePartitioning")
tk.MustExec(`create table t (a varchar(255), b varchar(255), key (a,b), key (b)) partition by key (a) partitions 7`)
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
// Fill the data with ascii strings
for i := 32; i <= 126; i++ {
tk.MustExec(fmt.Sprintf(`insert into t values (char(%d,%d,%d),char(%d,%d,%d,%d))`, i, i, i, i, i, i, i))
Expand Down Expand Up @@ -2979,7 +2984,11 @@ func TestRemoveKeyPartitioning(t *testing.T) {
" KEY `b` (`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
// Statistics are updated asynchronously
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
// And also cached and lazy loaded
h.Clear()
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
Expand All @@ -2997,7 +3006,11 @@ func TestRemoveListPartitioning(t *testing.T) {
tk.MustExec("create database RemoveListPartitioning")
tk.MustExec("use RemoveListPartitioning")
tk.MustExec(`create table t (a int, b varchar(255), key (a,b), key (b)) partition by list (a) (partition p0 values in (0), partition p1 values in (1), partition p2 values in (2), partition p3 values in (3), partition p4 values in (4))`)
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
// Fill the data with ascii strings
for i := 32; i <= 126; i++ {
tk.MustExec(fmt.Sprintf(`insert into t values (%d,char(%d,%d,%d,%d))`, i%5, i, i, i, i))
Expand Down Expand Up @@ -3025,7 +3038,11 @@ func TestRemoveListPartitioning(t *testing.T) {
" KEY `b` (`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
// Statistics are updated asynchronously
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
// And also cached and lazy loaded
h.Clear()
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
Expand All @@ -3043,7 +3060,11 @@ func TestRemoveListColumnPartitioning(t *testing.T) {
tk.MustExec("create database RemoveListPartitioning")
tk.MustExec("use RemoveListPartitioning")
tk.MustExec(`create table t (a varchar(255), b varchar(255), key (a,b), key (b)) partition by list columns (a) (partition p0 values in ("0"), partition p1 values in ("1"), partition p2 values in ("2"), partition p3 values in ("3"), partition p4 values in ("4"))`)
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
// Fill the data with ascii strings
for i := 32; i <= 126; i++ {
tk.MustExec(fmt.Sprintf(`insert into t values ("%d",char(%d,%d,%d,%d))`, i%5, i, i, i, i))
Expand Down Expand Up @@ -3071,7 +3092,11 @@ func TestRemoveListColumnPartitioning(t *testing.T) {
" KEY `b` (`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
// Statistics are updated asynchronously
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
// And also cached and lazy loaded
h.Clear()
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
Expand All @@ -3089,7 +3114,11 @@ func TestRemoveListColumnsPartitioning(t *testing.T) {
tk.MustExec("create database RemoveListPartitioning")
tk.MustExec("use RemoveListPartitioning")
tk.MustExec(`create table t (a int, b varchar(255), key (a,b), key (b)) partition by list columns (a,b) (partition p0 values in ((0,"0")), partition p1 values in ((1,"1")), partition p2 values in ((2,"2")), partition p3 values in ((3,"3")), partition p4 values in ((4,"4")))`)
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
// Fill the data
for i := 32; i <= 126; i++ {
tk.MustExec(fmt.Sprintf(`insert into t values (%d,"%d")`, i%5, i%5))
Expand Down Expand Up @@ -3117,7 +3146,11 @@ func TestRemoveListColumnsPartitioning(t *testing.T) {
" KEY `b` (`b`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
// Statistics are updated asynchronously
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
// And also cached and lazy loaded
h.Clear()
require.NoError(t, h.Update(context.Background(), dom.InfoSchema()))
Expand Down
22 changes: 1 addition & 21 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2438,9 +2438,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
// These tasks do not interfere with or depend on the initialization process.
do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker")
do.wg.Run(func() {
do.handleDDLEvent()
}, "handleDDLEvent")
do.ddlNotifier.RegisterHandler(notifier.StatsMetaHandlerID, do.StatsHandle().HandleDDLEvent)
// Wait for the stats worker to finish the initialization.
// Otherwise, we may start the auto analyze worker before the stats cache is initialized.
do.wg.Run(
Expand Down Expand Up @@ -2640,24 +2638,6 @@ func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle)
}
}

func (do *Domain) handleDDLEvent() {
logutil.BgLogger().Info("handleDDLEvent started.")
defer util.Recover(metrics.LabelDomain, "handleDDLEvent", nil, false)
statsHandle := do.StatsHandle()
for {
select {
case <-do.exit:
return
// This channel is sent only by ddl owner.
case t := <-statsHandle.DDLEventCh():
err := statsHandle.HandleDDLEvent(t)
if err != nil {
logutil.BgLogger().Error("handle ddl event failed", zap.String("event", t.String()), zap.Error(err))
}
}
}
}

func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
logutil.BgLogger().Info("updateStatsWorker started.")
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ go_test(
"//pkg/sessiontxn/staleread",
"//pkg/statistics",
"//pkg/statistics/handle/storage",
"//pkg/statistics/handle/util",
"//pkg/statistics/util",
"//pkg/store/copr",
"//pkg/store/driver/error",
Expand Down
27 changes: 23 additions & 4 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
Expand Down Expand Up @@ -173,7 +176,11 @@ func TestDataForTableStatsField(t *testing.T) {
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (c int, d int, e char(5), index idx(e))")
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check(
testkit.Rows("0 0 0 0"))
tk.MustExec(`insert into t(c, d, e) values(1, 2, "c"), (2, 3, "d"), (3, 4, "e")`)
Expand All @@ -200,7 +207,11 @@ func TestDataForTableStatsField(t *testing.T) {
// Test partition table.
tk.MustExec("drop table if exists t")
tk.MustExec(`CREATE TABLE t (a int, b int, c varchar(5), primary key(a), index idx(c)) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16))`)
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err = util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
tk.MustExec(`insert into t(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e")`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
Expand All @@ -219,7 +230,11 @@ func TestPartitionsTable(t *testing.T) {
testkit.WithPruneMode(tk, variable.Static, func() {
tk.MustExec("DROP TABLE IF EXISTS `test_partitions`;")
tk.MustExec(`CREATE TABLE test_partitions (a int, b int, c varchar(5), primary key(a), index idx(c)) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16));`)
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
tk.MustExec(`insert into test_partitions(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`)

tk.MustQuery("select PARTITION_NAME, PARTITION_DESCRIPTION from information_schema.PARTITIONS where table_name='test_partitions';").Check(
Expand All @@ -245,7 +260,11 @@ func TestPartitionsTable(t *testing.T) {
// Test for table has no partitions.
tk.MustExec("DROP TABLE IF EXISTS `test_partitions_1`;")
tk.MustExec(`CREATE TABLE test_partitions_1 (a int, b int, c varchar(5), primary key(a), index idx(c));`)
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
tk.MustExec(`insert into test_partitions_1(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`)
require.NoError(t, h.DumpStatsDeltaToKV(true))
require.NoError(t, h.Update(context.Background(), is))
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/test/analyzetest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_test(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/util",
"//pkg/testkit",
"//pkg/testkit/analyzehelper",
"//pkg/util/dbterror/exeerrors",
Expand Down
19 changes: 16 additions & 3 deletions pkg/executor/test/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/analyzehelper"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
Expand Down Expand Up @@ -496,7 +497,11 @@ func TestAdjustSampleRateNote(t *testing.T) {
statsHandle := domain.GetDomain(tk.Session().(sessionctx.Context)).StatsHandle()
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, index index_a(a))")
require.NoError(t, statsHandle.HandleDDLEvent(<-statsHandle.DDLEventCh()))
err := util.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return statsHandle.HandleDDLEvent(ctx, sctx, <-statsHandle.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
is := tk.Session().(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema)
tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
Expand Down Expand Up @@ -2719,7 +2724,11 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) {
tk.MustExec("create table t(a int)")
analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a")
h := dom.StatsHandle()
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tid := tbl.Meta().ID
Expand Down Expand Up @@ -2836,7 +2845,11 @@ func TestAnalyzeMVIndex(t *testing.T) {
"index ij_binary((cast(j->'$.bin' as binary(50) array)))," +
"index ij_char((cast(j->'$.char' as char(50) array)))" +
")")
require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh()))
err := util.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier)
return h.HandleDDLEvent(ctx, sctx, <-h.DDLEventCh())
}, util.FlagWrapTxn)
require.NoError(t, err)
jsonData := []map[string]any{
{
"signed": []int64{1, 2, 300, 300, 0, 4, 5, -40000},
Expand Down
1 change: 1 addition & 0 deletions pkg/planner/cardinality/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_test(
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/statistics",
"//pkg/statistics/handle/util",
"//pkg/testkit",
"//pkg/testkit/testdata",
"//pkg/testkit/testmain",
Expand Down
Loading

0 comments on commit 39eebe4

Please sign in to comment.