diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 912eaa271e4a5..1d23f316dd14f 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -588,18 +588,14 @@ func asyncNotifyEvent(jobCtx *jobContext, e *notifier.SchemaChangeEvent, job *mo return nil } - ch := jobCtx.oldDDLCtx.ddlEventCh - if ch != nil { - forLoop: - for i := 0; i < 10; i++ { - select { - case ch <- e: - break forLoop - default: - time.Sleep(time.Microsecond * 10) - } + // In test environments, we use a channel-based approach to handle DDL events. + // This maintains compatibility with existing test cases that expect events to be delivered through channels. + // In production, DDL events are handled by the notifier system instead. + if intest.InTest { + ch := jobCtx.oldDDLCtx.ddlEventCh + if ch != nil { + ch <- e } - logutil.DDLLogger().Warn("fail to notify DDL event", zap.Stringer("event", e)) } intest.Assert(jobCtx.eventPublishStore != nil, "eventPublishStore should not be nil") diff --git a/pkg/ddl/tests/partition/BUILD.bazel b/pkg/ddl/tests/partition/BUILD.bazel index 477577df5752b..2650bd7b3883b 100644 --- a/pkg/ddl/tests/partition/BUILD.bazel +++ b/pkg/ddl/tests/partition/BUILD.bazel @@ -33,6 +33,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/sessiontxn", + "//pkg/statistics/handle/ddl/testutil", "//pkg/store/gcworker", "//pkg/store/mockstore", "//pkg/table", diff --git a/pkg/ddl/tests/partition/db_partition_test.go b/pkg/ddl/tests/partition/db_partition_test.go index ddf7e18d91f7d..a4b14b38cf836 100644 --- a/pkg/ddl/tests/partition/db_partition_test.go +++ b/pkg/ddl/tests/partition/db_partition_test.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tables" @@ -2947,7 +2948,8 @@ func TestRemoveKeyPartitioning(t *testing.T) { tk.MustExec("create database RemovePartitioning") tk.MustExec("use RemovePartitioning") tk.MustExec(`create table t (a varchar(255), b varchar(255), key (a,b), key (b)) partition by key (a) partitions 7`) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // Fill the data with ascii strings for i := 32; i <= 126; i++ { tk.MustExec(fmt.Sprintf(`insert into t values (char(%d,%d,%d),char(%d,%d,%d,%d))`, i, i, i, i, i, i, i)) @@ -2979,7 +2981,8 @@ func TestRemoveKeyPartitioning(t *testing.T) { " KEY `b` (`b`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) // Statistics are updated asynchronously - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // And also cached and lazy loaded h.Clear() require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) @@ -2997,7 +3000,8 @@ func TestRemoveListPartitioning(t *testing.T) { tk.MustExec("create database RemoveListPartitioning") tk.MustExec("use RemoveListPartitioning") tk.MustExec(`create table t (a int, b varchar(255), key (a,b), key (b)) partition by list (a) (partition p0 values in (0), partition p1 values in (1), partition p2 values in (2), partition p3 values in (3), partition p4 values in (4))`) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // Fill the data with ascii strings for i := 32; i <= 126; i++ { tk.MustExec(fmt.Sprintf(`insert into t values (%d,char(%d,%d,%d,%d))`, i%5, i, i, i, i)) @@ -3025,7 +3029,8 @@ func TestRemoveListPartitioning(t *testing.T) { " KEY `b` (`b`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) // Statistics are updated asynchronously - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // And also cached and lazy loaded h.Clear() require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) @@ -3043,7 +3048,8 @@ func TestRemoveListColumnPartitioning(t *testing.T) { tk.MustExec("create database RemoveListPartitioning") tk.MustExec("use RemoveListPartitioning") tk.MustExec(`create table t (a varchar(255), b varchar(255), key (a,b), key (b)) partition by list columns (a) (partition p0 values in ("0"), partition p1 values in ("1"), partition p2 values in ("2"), partition p3 values in ("3"), partition p4 values in ("4"))`) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // Fill the data with ascii strings for i := 32; i <= 126; i++ { tk.MustExec(fmt.Sprintf(`insert into t values ("%d",char(%d,%d,%d,%d))`, i%5, i, i, i, i)) @@ -3071,7 +3077,8 @@ func TestRemoveListColumnPartitioning(t *testing.T) { " KEY `b` (`b`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) // Statistics are updated asynchronously - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // And also cached and lazy loaded h.Clear() require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) @@ -3089,7 +3096,8 @@ func TestRemoveListColumnsPartitioning(t *testing.T) { tk.MustExec("create database RemoveListPartitioning") tk.MustExec("use RemoveListPartitioning") tk.MustExec(`create table t (a int, b varchar(255), key (a,b), key (b)) partition by list columns (a,b) (partition p0 values in ((0,"0")), partition p1 values in ((1,"1")), partition p2 values in ((2,"2")), partition p3 values in ((3,"3")), partition p4 values in ((4,"4")))`) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // Fill the data for i := 32; i <= 126; i++ { tk.MustExec(fmt.Sprintf(`insert into t values (%d,"%d")`, i%5, i%5)) @@ -3117,7 +3125,8 @@ func TestRemoveListColumnsPartitioning(t *testing.T) { " KEY `b` (`b`)\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) // Statistics are updated asynchronously - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // And also cached and lazy loaded h.Clear() require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index a8d3df9895bcd..8c437a03cfe4d 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2449,9 +2449,6 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err // This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations. // These tasks do not interfere with or depend on the initialization process. do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker") - do.wg.Run(func() { - do.handleDDLEvent() - }, "handleDDLEvent") // Wait for the stats worker to finish the initialization. // Otherwise, we may start the auto analyze worker before the stats cache is initialized. do.wg.Run( @@ -2651,24 +2648,6 @@ func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) } } -func (do *Domain) handleDDLEvent() { - logutil.BgLogger().Info("handleDDLEvent started.") - defer util.Recover(metrics.LabelDomain, "handleDDLEvent", nil, false) - statsHandle := do.StatsHandle() - for { - select { - case <-do.exit: - return - // This channel is sent only by ddl owner. - case t := <-statsHandle.DDLEventCh(): - err := statsHandle.HandleDDLEvent(t) - if err != nil { - logutil.BgLogger().Error("handle ddl event failed", zap.String("event", t.String()), zap.Error(err)) - } - } - } -} - func (do *Domain) updateStatsWorker(_ sessionctx.Context) { defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false) logutil.BgLogger().Info("updateStatsWorker started.") diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 6a300d21ee529..06597904384b7 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -438,6 +438,7 @@ go_test( "//pkg/sessiontxn", "//pkg/sessiontxn/staleread", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/storage", "//pkg/statistics/util", "//pkg/store/copr", diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index 644896b064833..2962fb85d318c 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/sessionctx/variable" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" @@ -173,7 +174,8 @@ func TestDataForTableStatsField(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (c int, d int, e char(5), index idx(e))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( testkit.Rows("0 0 0 0")) tk.MustExec(`insert into t(c, d, e) values(1, 2, "c"), (2, 3, "d"), (3, 4, "e")`) @@ -200,7 +202,8 @@ func TestDataForTableStatsField(t *testing.T) { // Test partition table. tk.MustExec("drop table if exists t") tk.MustExec(`CREATE TABLE t (a int, b int, c varchar(5), primary key(a), index idx(c)) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16))`) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec(`insert into t(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e")`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) @@ -219,7 +222,8 @@ func TestPartitionsTable(t *testing.T) { testkit.WithPruneMode(tk, variable.Static, func() { tk.MustExec("DROP TABLE IF EXISTS `test_partitions`;") tk.MustExec(`CREATE TABLE test_partitions (a int, b int, c varchar(5), primary key(a), index idx(c)) PARTITION BY RANGE (a) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16));`) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec(`insert into test_partitions(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`) tk.MustQuery("select PARTITION_NAME, PARTITION_DESCRIPTION from information_schema.PARTITIONS where table_name='test_partitions';").Check( @@ -245,7 +249,8 @@ func TestPartitionsTable(t *testing.T) { // Test for table has no partitions. tk.MustExec("DROP TABLE IF EXISTS `test_partitions_1`;") tk.MustExec(`CREATE TABLE test_partitions_1 (a int, b int, c varchar(5), primary key(a), index idx(c));`) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec(`insert into test_partitions_1(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) diff --git a/pkg/executor/test/analyzetest/BUILD.bazel b/pkg/executor/test/analyzetest/BUILD.bazel index b9dde41d4ef75..7fb0b111fa4e3 100644 --- a/pkg/executor/test/analyzetest/BUILD.bazel +++ b/pkg/executor/test/analyzetest/BUILD.bazel @@ -27,6 +27,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/testkit", "//pkg/testkit/analyzehelper", "//pkg/util/dbterror/exeerrors", diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index 96428dd711c4a..669720ff30d48 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -39,6 +39,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/analyzehelper" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" @@ -496,7 +497,8 @@ func TestAdjustSampleRateNote(t *testing.T) { statsHandle := domain.GetDomain(tk.Session().(sessionctx.Context)).StatsHandle() tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, index index_a(a))") - require.NoError(t, statsHandle.HandleDDLEvent(<-statsHandle.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(statsHandle) + require.NoError(t, err) is := tk.Session().(sessionctx.Context).GetInfoSchema().(infoschema.InfoSchema) tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) @@ -2719,7 +2721,8 @@ func TestAutoAnalyzeAwareGlobalVariableChange(t *testing.T) { tk.MustExec("create table t(a int)") analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) tid := tbl.Meta().ID @@ -2836,7 +2839,8 @@ func TestAnalyzeMVIndex(t *testing.T) { "index ij_binary((cast(j->'$.bin' as binary(50) array)))," + "index ij_char((cast(j->'$.char' as char(50) array)))" + ")") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) jsonData := []map[string]any{ { "signed": []int64{1, 2, 300, 300, 0, 4, 5, -40000}, diff --git a/pkg/planner/cardinality/BUILD.bazel b/pkg/planner/cardinality/BUILD.bazel index 36657805f4bce..e478223ec4334 100644 --- a/pkg/planner/cardinality/BUILD.bazel +++ b/pkg/planner/cardinality/BUILD.bazel @@ -80,6 +80,7 @@ go_test( "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/testkit", "//pkg/testkit/testdata", "//pkg/testkit/testmain", diff --git a/pkg/planner/cardinality/selectivity_test.go b/pkg/planner/cardinality/selectivity_test.go index 898e932eed31e..6966bad0de0c7 100644 --- a/pkg/planner/cardinality/selectivity_test.go +++ b/pkg/planner/cardinality/selectivity_test.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" "github.com/pingcap/tidb/pkg/types" @@ -175,7 +176,8 @@ func TestOutOfRangeEstimationAfterDelete(t *testing.T) { testKit.MustExec("use test") testKit.MustExec("drop table if exists t") testKit.MustExec("create table t(a int unsigned)") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // [300, 900) // 5 rows for each value, 3000 rows in total. for i := 0; i < 3000; i++ { @@ -898,7 +900,8 @@ func TestGlobalStatsOutOfRangeEstimationAfterDelete(t *testing.T) { "partition p2 values less than (800)," + "partition p3 values less than (1000)," + "partition p4 values less than (1200))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) for i := 0; i < 3000; i++ { testKit.MustExec(fmt.Sprintf("insert into t values (%v)", i/5+300)) // [300, 900) } @@ -1005,7 +1008,8 @@ func TestIndexJoinInnerRowCountUpperBound(t *testing.T) { testKit.MustExec("use test") testKit.MustExec("drop table if exists t") testKit.MustExec("create table t(a int, b int, index idx(b))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) is := dom.InfoSchema() tb, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) @@ -1076,7 +1080,8 @@ func TestOrderingIdxSelectivityThreshold(t *testing.T) { testKit.MustExec("use test") testKit.MustExec("drop table if exists t") testKit.MustExec("create table t(a int primary key , b int, c int, index ib(b), index ic(c))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) is := dom.InfoSchema() tb, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) @@ -1159,7 +1164,8 @@ func TestOrderingIdxSelectivityRatio(t *testing.T) { testKit.MustExec("use test") testKit.MustExec("drop table if exists t") testKit.MustExec("create table t(a int primary key, b int, c int, index ib(b), index ic(c))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) is := dom.InfoSchema() tb, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) @@ -1240,7 +1246,8 @@ func TestCrossValidationSelectivity(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 1") tk.MustExec("create table t (a int, b int, c int, primary key (a, b) clustered)") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t values (1,2,3), (1,4,5)") require.NoError(t, h.DumpStatsDeltaToKV(true)) tk.MustExec("analyze table t") @@ -1257,7 +1264,8 @@ func TestIgnoreRealtimeStats(t *testing.T) { testKit.MustExec("drop table if exists t") testKit.MustExec("create table t(a int, b int)") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) // 1. Insert 11 rows of data without ANALYZE. testKit.MustExec("insert into t values(1,1),(1,2),(1,3),(1,4),(1,5),(2,1),(2,2),(2,3),(2,4),(2,5),(3,1)") @@ -1371,7 +1379,8 @@ func TestBuiltinInEstWithoutStats(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t(a int, b int)") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8), (9,9), (10,10)") require.NoError(t, h.DumpStatsDeltaToKV(true)) is := dom.InfoSchema() diff --git a/pkg/planner/cardinality/trace_test.go b/pkg/planner/cardinality/trace_test.go index f61fe7932e43e..1b8a4ac370f72 100644 --- a/pkg/planner/cardinality/trace_test.go +++ b/pkg/planner/cardinality/trace_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" "github.com/pingcap/tidb/pkg/planner/core/resolve" "github.com/pingcap/tidb/pkg/sessionctx" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" "github.com/pingcap/tidb/pkg/util/tracing" @@ -135,7 +136,8 @@ func TestTraceDebugSelectivity(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, b int, index iab(a, b), index ib(b))") - require.NoError(t, statsHandle.HandleDDLEvent(<-statsHandle.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(statsHandle) + require.NoError(t, err) // Prepare the data. @@ -188,7 +190,7 @@ func TestTraceDebugSelectivity(t *testing.T) { sql := "explain " + tt tk.MustExec(sql) } - err := statsHandle.LoadNeededHistograms(dom.InfoSchema()) + err = statsHandle.LoadNeededHistograms(dom.InfoSchema()) require.NoError(t, err) sctx := tk.Session().(sessionctx.Context) diff --git a/pkg/planner/core/BUILD.bazel b/pkg/planner/core/BUILD.bazel index 417addad48f56..9c4a71f20b1bc 100644 --- a/pkg/planner/core/BUILD.bazel +++ b/pkg/planner/core/BUILD.bazel @@ -300,6 +300,7 @@ go_test( "//pkg/sessionctx/variable", "//pkg/sessiontxn", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/store/mockstore", "//pkg/table", "//pkg/testkit", diff --git a/pkg/planner/core/casetest/BUILD.bazel b/pkg/planner/core/casetest/BUILD.bazel index bdfed3c28d0a0..96a9a71181d6a 100644 --- a/pkg/planner/core/casetest/BUILD.bazel +++ b/pkg/planner/core/casetest/BUILD.bazel @@ -24,6 +24,7 @@ go_test( "//pkg/planner/core/operator/logicalop", "//pkg/planner/core/resolve", "//pkg/planner/property", + "//pkg/statistics/handle/ddl/testutil", "//pkg/testkit", "//pkg/testkit/testdata", "//pkg/testkit/testmain", diff --git a/pkg/planner/core/casetest/cbotest/BUILD.bazel b/pkg/planner/core/casetest/cbotest/BUILD.bazel index 3bd8ef10f6d32..255649ae34d83 100644 --- a/pkg/planner/core/casetest/cbotest/BUILD.bazel +++ b/pkg/planner/core/casetest/cbotest/BUILD.bazel @@ -22,6 +22,7 @@ go_test( "//pkg/session", "//pkg/sessionctx/variable", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/util", "//pkg/testkit", "//pkg/testkit/testdata", diff --git a/pkg/planner/core/casetest/cbotest/cbo_test.go b/pkg/planner/core/casetest/cbotest/cbo_test.go index e80f13898a3ac..e66e9cf5a1cc0 100644 --- a/pkg/planner/core/casetest/cbotest/cbo_test.go +++ b/pkg/planner/core/casetest/cbotest/cbo_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" @@ -65,8 +66,11 @@ func TestCBOWithoutAnalyze(t *testing.T) { testKit.MustExec("create table t1 (a int)") testKit.MustExec("create table t2 (a int)") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) + + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) testKit.MustExec("insert into t1 values (1), (2), (3), (4), (5), (6)") testKit.MustExec("insert into t2 values (1), (2), (3), (4), (5), (6)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -95,7 +99,8 @@ func TestStraightJoin(t *testing.T) { h := dom.StatsHandle() for _, tblName := range []string{"t1", "t2", "t3", "t4"} { testKit.MustExec(fmt.Sprintf("create table %s (a int)", tblName)) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) } var input []string var output [][]string @@ -117,8 +122,8 @@ func TestTableDual(t *testing.T) { h := dom.StatsHandle() testKit.MustExec(`create table t(a int)`) testKit.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (7), (8), (9), (10)") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) - + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) var input []string @@ -150,7 +155,8 @@ func TestEstimation(t *testing.T) { testKit.MustExec("insert into t select * from t") testKit.MustExec("insert into t select * from t") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) testKit.MustExec("analyze table t all columns") for i := 1; i <= 8; i++ { @@ -336,7 +342,8 @@ func TestOutdatedAnalyze(t *testing.T) { testKit.MustExec(fmt.Sprintf("insert into t values (%d,%d)", i, i)) } h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) testKit.MustExec("analyze table t all columns") testKit.MustExec("insert into t select * from t") diff --git a/pkg/planner/core/casetest/integration_test.go b/pkg/planner/core/casetest/integration_test.go index 9793d1c3374c1..72b00393b6d83 100644 --- a/pkg/planner/core/casetest/integration_test.go +++ b/pkg/planner/core/casetest/integration_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" "github.com/stretchr/testify/require" @@ -225,7 +226,8 @@ func TestIssue32632(t *testing.T) { "`S_COMMENT` varchar(101) NOT NULL," + "PRIMARY KEY (`S_SUPPKEY`) /*T![clustered_index] CLUSTERED */)") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("set @@tidb_enforce_mpp = 1") tbl1, err := dom.InfoSchema().TableByName(context.Background(), pmodel.CIStr{O: "test", L: "test"}, pmodel.CIStr{O: "partsupp", L: "partsupp"}) diff --git a/pkg/planner/core/casetest/vectorsearch/BUILD.bazel b/pkg/planner/core/casetest/vectorsearch/BUILD.bazel index ca2f66ab0a98c..414d2e58e214a 100644 --- a/pkg/planner/core/casetest/vectorsearch/BUILD.bazel +++ b/pkg/planner/core/casetest/vectorsearch/BUILD.bazel @@ -21,6 +21,7 @@ go_test( "//pkg/planner/core/base", "//pkg/planner/core/resolve", "//pkg/session", + "//pkg/statistics/handle/ddl/testutil", "//pkg/store/mockstore", "//pkg/testkit", "//pkg/testkit/testdata", diff --git a/pkg/planner/core/casetest/vectorsearch/vector_index_test.go b/pkg/planner/core/casetest/vectorsearch/vector_index_test.go index aed89b570593c..68c0e4aacaa37 100644 --- a/pkg/planner/core/casetest/vectorsearch/vector_index_test.go +++ b/pkg/planner/core/casetest/vectorsearch/vector_index_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/planner/core/resolve" "github.com/pingcap/tidb/pkg/session" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" @@ -88,7 +89,8 @@ func TestTiFlashANNIndex(t *testing.T) { dom := domain.GetDomain(tk.Session()) testkit.SetTiFlashReplica(t, dom, "test", "t1") handle := dom.StatsHandle() - require.NoError(t, handle.HandleDDLEvent(<-handle.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(handle) + require.NoError(t, err) tk.MustExec("analyze table t1") tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'") diff --git a/pkg/planner/core/indexmerge_intersection_test.go b/pkg/planner/core/indexmerge_intersection_test.go index b4f97c66559c0..c9a56342c6a2f 100644 --- a/pkg/planner/core/indexmerge_intersection_test.go +++ b/pkg/planner/core/indexmerge_intersection_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/pingcap/tidb/pkg/planner/core" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" "github.com/stretchr/testify/require" @@ -147,7 +148,8 @@ func TestHintForIntersectionIndexMerge(t *testing.T) { tk.MustExec("insert into t8 values('啊aabbccdd', 'abcc', 'cccc', 'aa', '2,test')," + "('啊aabb', 'abcdc', 'aaaa', '??', '2')") - require.NoError(t, handle.HandleDDLEvent(<-handle.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(handle) + require.NoError(t, err) require.Nil(t, handle.Update(context.Background(), domain.InfoSchema())) tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") tk.MustExec("analyze table t1,t2,t3,t4") diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 5d543393cdb6c..e9fa725fe3703 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -180,6 +180,7 @@ go_test( "//pkg/session", "//pkg/sessionctx/variable", "//pkg/sessiontxn", + "//pkg/statistics/handle/ddl/testutil", "//pkg/store/mockstore", "//pkg/store/mockstore/unistore", "//pkg/testkit", diff --git a/pkg/server/handler/optimizor/BUILD.bazel b/pkg/server/handler/optimizor/BUILD.bazel index 326afbaefa59c..e91a85c639470 100644 --- a/pkg/server/handler/optimizor/BUILD.bazel +++ b/pkg/server/handler/optimizor/BUILD.bazel @@ -56,6 +56,7 @@ go_test( "//pkg/server/internal/testutil", "//pkg/server/internal/util", "//pkg/session", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/types", "//pkg/statistics/util", "//pkg/store/mockstore/unistore", diff --git a/pkg/server/handler/optimizor/optimize_trace_test.go b/pkg/server/handler/optimizor/optimize_trace_test.go index b10cf19a68118..b50c79ab88435 100644 --- a/pkg/server/handler/optimizor/optimize_trace_test.go +++ b/pkg/server/handler/optimizor/optimize_trace_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/pkg/server/internal/testutil" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) @@ -87,7 +88,7 @@ func prepareData4OptimizeTrace(t *testing.T, client *testserverclient.TestServer tk.MustExec("create database optimizeTrace") tk.MustExec("use optimizeTrace") tk.MustExec("create table t(a int)") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) rows := tk.MustQuery("trace plan select * from t") require.True(t, rows.Next(), "unexpected data") diff --git a/pkg/server/handler/optimizor/plan_replayer_test.go b/pkg/server/handler/optimizor/plan_replayer_test.go index 59e1b6931050f..dd053ea48f469 100644 --- a/pkg/server/handler/optimizor/plan_replayer_test.go +++ b/pkg/server/handler/optimizor/plan_replayer_test.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/server/internal/testutil" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" util2 "github.com/pingcap/tidb/pkg/statistics/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/replayer" @@ -218,7 +219,7 @@ func prepareData4PlanReplayer(t *testing.T, client *testserverclient.TestServerC tk.MustExec("CREATE TABLE authors (id INT PRIMARY KEY AUTO_INCREMENT,name VARCHAR(100) NOT NULL,email VARCHAR(100) UNIQUE NOT NULL);") tk.MustExec("CREATE TABLE books (id INT PRIMARY KEY AUTO_INCREMENT,title VARCHAR(200) NOT NULL,publication_date DATE NOT NULL,author_id INT,FOREIGN KEY (author_id) REFERENCES authors(id) ON DELETE CASCADE);") tk.MustExec("create table tt(a int, b varchar(10)) PARTITION BY HASH(a) PARTITIONS 4;") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) tk.MustExec("insert into t values(1), (2), (3), (4)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -430,7 +431,8 @@ func prepareData4Issue43192(t *testing.T, client *testserverclient.TestServerCli tk.MustExec("create database planReplayer") tk.MustExec("use planReplayer") tk.MustExec("create table t(a int, b int, INDEX ia (a), INDEX ib (b)) PARTITION BY HASH(a) PARTITIONS 4;") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("INSERT INTO t (a, b) VALUES (1, 1), (2, 2), (3, 3), (4, 4),(5, 5), (6, 6), (7, 7), (8, 8),(9, 9), (10, 10), (11, 11), (12, 12),(13, 13), (14, 14), (15, 15), (16, 16),(17, 17), (18, 18), (19, 19), (20, 20),(21, 21), (22, 22), (23, 23), (24, 24),(25, 25), (26, 26), (27, 27), (28, 28),(29, 29), (30, 30), (31, 31), (32, 32),(33, 33), (34, 34), (35, 35), (36, 36),(37, 37), (38, 38), (39, 39), (40, 40),(41, 41), (42, 42), (43, 43), (44, 44),(45, 45), (46, 46), (47, 47), (48, 48),(49, 49), (50, 50), (51, 51), (52, 52),(53, 53), (54, 54), (55, 55), (56, 56),(57, 57), (58, 58), (59, 59), (60, 60),(61, 61), (62, 62), (63, 63), (64, 64),(65, 65), (66, 66), (67, 67), (68, 68),(69, 69), (70, 70), (71, 71), (72, 72),(73, 73), (74, 74), (75, 75), (76, 76),(77, 77), (78, 78), (79, 79), (80, 80),(81, 81), (82, 82), (83, 83), (84, 84),(85, 85), (86, 86), (87, 87), (88, 88),(89, 89), (90, 90), (91, 91), (92, 92),(93, 93), (94, 94), (95, 95), (96, 96),(97, 97), (98, 98), (99, 99), (100, 100);") require.NoError(t, h.DumpStatsDeltaToKV(true)) tk.MustExec("analyze table t") @@ -466,13 +468,13 @@ func prepareData4Issue56458(t *testing.T, client *testserverclient.TestServerCli "FOLLOWERS=3 " + "FOLLOWER_CONSTRAINTS=\"[+disk=ssd]\"") tk.MustExec("CREATE TABLE v(id INT PRIMARY KEY AUTO_INCREMENT);") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) tk.MustExec("create table planReplayer2.t(a int, b int, INDEX ia (a), INDEX ib (b), author_id int, FOREIGN KEY (author_id) REFERENCES planReplayer.v(id) ON DELETE CASCADE);") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) tk.MustExec("create table t(a int, b int, INDEX ia (a), INDEX ib (b), author_id int, FOREIGN KEY (author_id) REFERENCES planReplayer2.t(a) ON DELETE CASCADE) placement policy p;") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) tk.MustExec("create global binding for select a, b from t where a in (1, 2, 3) using select a, b from t use index (ib) where a in (1, 2, 3)") diff --git a/pkg/server/handler/optimizor/statistics_handler_test.go b/pkg/server/handler/optimizor/statistics_handler_test.go index cf479cc0b439b..56d8b5b9a8be4 100644 --- a/pkg/server/handler/optimizor/statistics_handler_test.go +++ b/pkg/server/handler/optimizor/statistics_handler_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/pkg/server/internal/testutil" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/types" statsutil "github.com/pingcap/tidb/pkg/statistics/util" "github.com/pingcap/tidb/pkg/testkit" @@ -151,7 +152,7 @@ func prepareData(t *testing.T, client *testserverclient.TestServerClient, statHa tk.MustExec("create database tidb") tk.MustExec("use tidb") tk.MustExec("create table test (a int, b varchar(20))") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) tk.MustExec("create index c on test (a, b)") tk.MustExec("insert test values (1, 's')") diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 00b4480697ce1..b85f342d4a946 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/pkg/server/internal" "github.com/pingcap/tidb/pkg/server/internal/testutil" "github.com/pingcap/tidb/pkg/server/internal/util" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testdata" "github.com/pingcap/tidb/pkg/util/arena" @@ -68,7 +69,8 @@ func TestOptimizerDebugTrace(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t (col1 int, index i(col1))") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("plan replayer capture '0595c79f25d183319d0830ff8ca538c9054cbf407e5e27488b5dc40e4738a7c8' '*'") tk.MustExec("plan replayer capture 'c0fcc0abbaaffcaafe21115a3c67ae5d96a188cc197559953d2865ea6852d3cc' '*'") diff --git a/pkg/statistics/BUILD.bazel b/pkg/statistics/BUILD.bazel index cad48db7936fe..29fa59bd61302 100644 --- a/pkg/statistics/BUILD.bazel +++ b/pkg/statistics/BUILD.bazel @@ -91,6 +91,7 @@ go_test( "//pkg/planner/core/resolve", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", + "//pkg/statistics/handle/ddl/testutil", "//pkg/testkit", "//pkg/testkit/analyzehelper", "//pkg/testkit/testdata", diff --git a/pkg/statistics/handle/BUILD.bazel b/pkg/statistics/handle/BUILD.bazel index 671010338607a..3831a47f64388 100644 --- a/pkg/statistics/handle/BUILD.bazel +++ b/pkg/statistics/handle/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/types", "//pkg/util", "//pkg/util/chunk", + "//pkg/util/intest", "//pkg/util/logutil", "//pkg/util/memory", "@com_github_pingcap_errors//:errors", diff --git a/pkg/statistics/handle/autoanalyze/BUILD.bazel b/pkg/statistics/handle/autoanalyze/BUILD.bazel index 5bc8ae050e6d2..ae7b9f57d2fb7 100644 --- a/pkg/statistics/handle/autoanalyze/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/BUILD.bazel @@ -49,6 +49,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/util", "//pkg/statistics/handle/util/test", "//pkg/store/mockstore", diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go index 30ec02066408d..fd18834fdfffb 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go @@ -31,7 +31,8 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze" - statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" + "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/statistics/handle/util/test" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" @@ -62,7 +63,7 @@ func TestEnableAutoAnalyzePriorityQueue(t *testing.T) { tk.MustExec("SET GLOBAL tidb_enable_auto_analyze_priority_queue=ON") require.True(t, variable.EnableAutoAnalyzePriorityQueue.Load()) h := dom.StatsHandle() - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) is := dom.InfoSchema() @@ -81,7 +82,7 @@ func TestAutoAnalyzeLockedTable(t *testing.T) { tk.MustExec("create table t (a int)") tk.MustExec("insert into t values (1)") h := dom.StatsHandle() - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) // Lock the table. @@ -112,7 +113,7 @@ func TestAutoAnalyzeWithPredicateColumns(t *testing.T) { tk.MustExec("insert into t values (1, 1)") tk.MustQuery("select * from t where a > 0").Check(testkit.Rows("1 1")) h := dom.StatsHandle() - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpColStatsUsageToKV()) require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -161,7 +162,7 @@ func disableAutoAnalyzeCase(t *testing.T, tk *testkit.TestKit, dom *domain.Domai tk.MustExec("create table t (a int)") tk.MustExec("insert into t values (1)") h := dom.StatsHandle() - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) is := dom.InfoSchema() @@ -200,7 +201,7 @@ func TestAutoAnalyzeOnChangeAnalyzeVer(t *testing.T) { statistics.AutoAnalyzeMinCnt = 1000 }() h := do.StatsHandle() - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) is := do.InfoSchema() @@ -243,7 +244,7 @@ func TestAutoAnalyzeOnChangeAnalyzeVer(t *testing.T) { // Add a new table after the analyze version set to 2. tk.MustExec("create table tt(a int, index idx(a))") tk.MustExec("insert into tt values(1), (2), (3), (4), (5)") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) is = do.InfoSchema() @@ -506,7 +507,7 @@ func TestCleanupCorruptedAnalyzeJobsOnCurrentInstance(t *testing.T) { // Set up the mock function to return the row exec.EXPECT().ExecRestrictedSQL( gomock.All(&test.CtxMatcher{}), - statsutil.UseCurrentSessionOpt, + util.UseCurrentSessionOpt, autoanalyze.SelectAnalyzeJobsOnCurrentInstanceSQL, "127.0.0.1:4000", gomock.Any(), @@ -514,7 +515,7 @@ func TestCleanupCorruptedAnalyzeJobsOnCurrentInstance(t *testing.T) { exec.EXPECT().ExecRestrictedSQL( gomock.All(&test.CtxMatcher{}), - statsutil.UseCurrentSessionOpt, + util.UseCurrentSessionOpt, autoanalyze.BatchUpdateAnalyzeJobSQL, []any{[]string{"1"}}, ).Return(nil, nil, nil) @@ -531,14 +532,14 @@ func TestCleanupCorruptedAnalyzeJobsOnCurrentInstance(t *testing.T) { // Set up the mock function to return the row exec.EXPECT().ExecRestrictedSQL( gomock.All(&test.CtxMatcher{}), - statsutil.UseCurrentSessionOpt, + util.UseCurrentSessionOpt, autoanalyze.SelectAnalyzeJobsOnCurrentInstanceSQL, "127.0.0.1:4000", ).Return(rows, nil, nil) exec.EXPECT().ExecRestrictedSQL( gomock.All(&test.CtxMatcher{}), - statsutil.UseCurrentSessionOpt, + util.UseCurrentSessionOpt, autoanalyze.BatchUpdateAnalyzeJobSQL, []any{[]string{"1", "3"}}, ).Return(nil, nil, nil) @@ -588,14 +589,14 @@ func TestCleanupCorruptedAnalyzeJobsOnDeadInstances(t *testing.T) { // Set up the mock function to return the row exec.EXPECT().ExecRestrictedSQL( gomock.All(&test.CtxMatcher{}), - statsutil.UseCurrentSessionOpt, + util.UseCurrentSessionOpt, autoanalyze.SelectAnalyzeJobsSQL, gomock.Any(), ).Return(rows, nil, nil) exec.EXPECT().ExecRestrictedSQL( gomock.All(&test.CtxMatcher{}), - statsutil.UseCurrentSessionOpt, + util.UseCurrentSessionOpt, autoanalyze.BatchUpdateAnalyzeJobSQL, []any{[]string{"2"}}, ).Return(nil, nil, nil) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel index 93559021972ab..993bc7491fabc 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/BUILD.bazel @@ -68,6 +68,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/sysproctrack", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", "//pkg/store/mockstore", diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go index 2e82c1300e45b..e5188078af7dd 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" @@ -128,10 +129,10 @@ func TestHandleDDLEventsWithRunningJobs(t *testing.T) { addIndexEvent := findEvent(handle.DDLEventCh(), model.ActionAddIndex) // Handle the add index event. - require.NoError(t, handle.HandleDDLEvent(addIndexEvent)) + err = statstestutil.HandleDDLEventWithTxn(handle, addIndexEvent) + require.NoError(t, err) // Handle the add index event in priority queue. - require.NoError(t, statsutil.CallWithSCtx(handle.SPool(), func(sctx sessionctx.Context) error { return pq.HandleDDLEvent(ctx, sctx, addIndexEvent) }, statsutil.FlagWrapTxn)) @@ -200,7 +201,8 @@ func TestTruncateTable(t *testing.T) { truncateTableEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTable) // Handle the truncate table event. - require.NoError(t, h.HandleDDLEvent(truncateTableEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, truncateTableEvent) + require.NoError(t, err) ctx := context.Background() sctx := testKit.Session().(sessionctx.Context) @@ -261,7 +263,8 @@ func testTruncatePartitionedTable( truncateTableEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTable) // Handle the truncate table event. - require.NoError(t, h.HandleDDLEvent(truncateTableEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, truncateTableEvent) + require.NoError(t, err) ctx := context.Background() sctx := testKit.Session().(sessionctx.Context) @@ -311,7 +314,8 @@ func TestDropTable(t *testing.T) { dropTableEvent := findEvent(h.DDLEventCh(), model.ActionDropTable) // Handle the drop table event. - require.NoError(t, h.HandleDDLEvent(dropTableEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, dropTableEvent) + require.NoError(t, err) ctx := context.Background() sctx := testKit.Session().(sessionctx.Context) @@ -372,7 +376,8 @@ func testDropPartitionedTable( dropTableEvent := findEvent(h.DDLEventCh(), model.ActionDropTable) // Handle the drop table event. - require.NoError(t, h.HandleDDLEvent(dropTableEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, dropTableEvent) + require.NoError(t, err) ctx := context.Background() sctx := testKit.Session().(sessionctx.Context) @@ -425,7 +430,8 @@ func TestTruncateTablePartition(t *testing.T) { truncateTablePartitionEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTablePartition) // Handle the truncate table partition event. - require.NoError(t, h.HandleDDLEvent(truncateTablePartitionEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, truncateTablePartitionEvent) + require.NoError(t, err) ctx := context.Background() require.NoError(t, statsutil.CallWithSCtx( @@ -483,7 +489,8 @@ func TestDropTablePartition(t *testing.T) { dropTablePartitionEvent := findEvent(h.DDLEventCh(), model.ActionDropTablePartition) // Handle the drop table partition event. - require.NoError(t, h.HandleDDLEvent(dropTablePartitionEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, dropTablePartitionEvent) + require.NoError(t, err) ctx := context.Background() require.NoError(t, statsutil.CallWithSCtx( @@ -547,7 +554,8 @@ func TestExchangeTablePartition(t *testing.T) { exchangeTablePartitionEvent := findEvent(h.DDLEventCh(), model.ActionExchangeTablePartition) // Handle the exchange table partition event. - require.NoError(t, h.HandleDDLEvent(exchangeTablePartitionEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, exchangeTablePartitionEvent) + require.NoError(t, err) ctx := context.Background() require.NoError(t, statsutil.CallWithSCtx( @@ -608,7 +616,8 @@ func TestReorganizeTablePartition(t *testing.T) { reorganizeTablePartitionEvent := findEvent(h.DDLEventCh(), model.ActionReorganizePartition) // Handle the reorganize table partition event. - require.NoError(t, h.HandleDDLEvent(reorganizeTablePartitionEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, reorganizeTablePartitionEvent) + require.NoError(t, err) ctx := context.Background() require.NoError(t, statsutil.CallWithSCtx( @@ -666,7 +675,8 @@ func TestAlterTablePartitioning(t *testing.T) { alterTablePartitioningEvent := findEvent(h.DDLEventCh(), model.ActionAlterTablePartitioning) // Handle the alter table partitioning event. - require.NoError(t, h.HandleDDLEvent(alterTablePartitioningEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, alterTablePartitioningEvent) + require.NoError(t, err) ctx := context.Background() require.NoError(t, statsutil.CallWithSCtx( @@ -724,7 +734,8 @@ func TestRemovePartitioning(t *testing.T) { removePartitioningEvent := findEvent(h.DDLEventCh(), model.ActionRemovePartitioning) // Handle the remove partitioning event. - require.NoError(t, h.HandleDDLEvent(removePartitioningEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, removePartitioningEvent) + require.NoError(t, err) ctx := context.Background() require.NoError(t, statsutil.CallWithSCtx( @@ -794,7 +805,8 @@ func TestDropSchemaEventWithDynamicPartition(t *testing.T) { require.NotNil(t, dropSchemaEvent) // Handle the drop schema event. - require.NoError(t, h.HandleDDLEvent(dropSchemaEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, dropSchemaEvent) + require.NoError(t, err) ctx := context.Background() require.NoError(t, statsutil.CallWithSCtx( @@ -846,7 +858,8 @@ func TestDropSchemaEventWithStaticPartition(t *testing.T) { require.NotNil(t, dropSchemaEvent) // Handle the drop schema event. - require.NoError(t, h.HandleDDLEvent(dropSchemaEvent)) + err = statstestutil.HandleDDLEventWithTxn(h, dropSchemaEvent) + require.NoError(t, err) ctx := context.Background() require.NoError(t, statsutil.CallWithSCtx( diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go index 0e32590ff6aa3..f12fc7df42378 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/priorityqueue" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) @@ -642,7 +643,8 @@ func TestPQHandlesTableDeletionGracefully(t *testing.T) { tk.MustExec("drop table t1") deleteEvent := findEvent(handle.DDLEventCh(), model.ActionDropTable) require.NotNil(t, deleteEvent) - require.NoError(t, handle.HandleDDLEvent(deleteEvent)) + err = statstestutil.HandleDDLEventWithTxn(handle, deleteEvent) + require.NoError(t, err) require.NoError(t, handle.Update(ctx, dom.InfoSchema())) // Make sure handle.Get() returns false. diff --git a/pkg/statistics/handle/ddl/BUILD.bazel b/pkg/statistics/handle/ddl/BUILD.bazel index 3851b8590f1e7..991a65647abe1 100644 --- a/pkg/statistics/handle/ddl/BUILD.bazel +++ b/pkg/statistics/handle/ddl/BUILD.bazel @@ -4,11 +4,7 @@ go_library( name = "ddl", srcs = [ "ddl.go", - "drop_partition.go", - "exchange_partition.go", - "reorganize_partition.go", "subscriber.go", - "truncate_partition.go", ], importpath = "github.com/pingcap/tidb/pkg/statistics/handle/ddl", visibility = ["//visibility:public"], @@ -42,6 +38,7 @@ go_test( "//pkg/meta/model", "//pkg/parser/model", "//pkg/planner/cardinality", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/storage", "//pkg/statistics/handle/util", "//pkg/testkit", diff --git a/pkg/statistics/handle/ddl/ddl.go b/pkg/statistics/handle/ddl/ddl.go index 62d3a60603436..2cc06ce54316d 100644 --- a/pkg/statistics/handle/ddl/ddl.go +++ b/pkg/statistics/handle/ddl/ddl.go @@ -19,22 +19,18 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl/notifier" - "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" - "github.com/pingcap/tidb/pkg/statistics/handle/logutil" "github.com/pingcap/tidb/pkg/statistics/handle/storage" "github.com/pingcap/tidb/pkg/statistics/handle/types" "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/util/intest" - "go.uber.org/zap" ) type ddlHandlerImpl struct { ddlEventCh chan *notifier.SchemaChangeEvent statsWriter types.StatsReadWriter statsHandler types.StatsHandle + sub *subscriber } // NewDDLHandler creates a new ddl handler. @@ -46,158 +42,18 @@ func NewDDLHandler( ddlEventCh: make(chan *notifier.SchemaChangeEvent, 1000), statsWriter: statsWriter, statsHandler: statsHandler, + sub: newSubscriber(statsHandler), } } // HandleDDLEvent begins to process a ddl task. -func (h *ddlHandlerImpl) HandleDDLEvent(s *notifier.SchemaChangeEvent) error { - switch s.GetType() { - case model.ActionCreateTable: - newTableInfo := s.GetCreateTableInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil { - return err - } - } - case model.ActionTruncateTable: - newTableInfo, droppedTableInfo := s.GetTruncateTableInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil { - return err - } - } - - // Remove the old table stats. - droppedIDs, err := h.getTableIDs(droppedTableInfo) - if err != nil { - return err - } - for _, id := range droppedIDs { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(id); err != nil { - return err - } - } - case model.ActionDropTable: - droppedTableInfo := s.GetDropTableInfo() - ids, err := h.getTableIDs(droppedTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(id); err != nil { - return err - } - } - case model.ActionAddColumn: - newTableInfo, newColumnInfo := s.GetAddColumnInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertColStats2KV(id, newColumnInfo); err != nil { - return err - } - } - case model.ActionModifyColumn: - newTableInfo, modifiedColumnInfo := s.GetModifyColumnInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertColStats2KV(id, modifiedColumnInfo); err != nil { - return err - } - } - case model.ActionAddTablePartition: - globalTableInfo, addedPartitionInfo := s.GetAddPartitionInfo() - for _, def := range addedPartitionInfo.Definitions { - if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil { - return err - } - } - case model.ActionTruncateTablePartition: - if err := h.onTruncatePartitions(s); err != nil { - return err - } - case model.ActionDropTablePartition: - if err := h.onDropPartitions(s); err != nil { - return err - } - // EXCHANGE PARTITION EVENT NOTES: - // 1. When a partition is exchanged with a system table, we need to adjust the global statistics - // based on the count delta and modify count delta. However, due to the involvement of the system table, - // a complete update of the global statistics is not feasible. Therefore, we bypass the statistics update - // for the table in this scenario. Despite this, the table id still changes, so the statistics for the - // system table will still be visible. - // 2. If the system table is a partitioned table, we will update the global statistics for the partitioned table. - // It is rare to exchange a partition from a system table, so we can ignore this case. In this case, - // the system table will have statistics, but this is not a significant issue. - // So we decided to completely ignore the system table event. - case model.ActionExchangeTablePartition: - if err := h.onExchangeAPartition(s); err != nil { - return err - } - case model.ActionReorganizePartition: - if err := h.onReorganizePartitions(s); err != nil { - return err - } - case model.ActionAlterTablePartitioning: - oldSingleTableID, globalTableInfo, addedPartInfo := s.GetAddPartitioningInfo() - // Add new partition stats. - for _, def := range addedPartInfo.Definitions { - if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil { - return err - } - } - // Change id for global stats, since the data has not changed! - // Note: This operation will update all tables related to statistics with the new ID. - return h.statsWriter.ChangeGlobalStatsID(oldSingleTableID, globalTableInfo.ID) - case model.ActionRemovePartitioning: - // Change id for global stats, since the data has not changed! - // Note: This operation will update all tables related to statistics with the new ID. - oldTblID, newSingleTableInfo, droppedPartInfo := s.GetRemovePartitioningInfo() - if err := h.statsWriter.ChangeGlobalStatsID(oldTblID, newSingleTableInfo.ID); err != nil { - return err - } +func (h *ddlHandlerImpl) HandleDDLEvent(ctx context.Context, sctx sessionctx.Context, s *notifier.SchemaChangeEvent) error { + return h.sub.handle(ctx, sctx, s) +} - // Remove partition stats. - for _, def := range droppedPartInfo.Definitions { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(def.ID); err != nil { - return err - } - } - case model.ActionFlashbackCluster: - return h.statsWriter.UpdateStatsVersion() - case model.ActionAddIndex: - // No need to update the stats meta for the adding index event. - case model.ActionDropSchema: - miniDBInfo := s.GetDropSchemaInfo() - intest.Assert(miniDBInfo != nil) - for _, table := range miniDBInfo.Tables { - // Try best effort to update the stats meta version for gc. - if err := h.statsWriter.UpdateStatsMetaVersionForGC(table.ID); err != nil { - logutil.StatsLogger().Error( - "Failed to update stats meta version for gc", - zap.Int64("tableID", table.ID), - zap.Error(err), - ) - } - } - default: - intest.Assert(false) - logutil.StatsLogger().Error("Unhandled schema change event", zap.Stringer("type", s)) - } - return nil +// DDLEventCh returns ddl events channel in handle. +func (h *ddlHandlerImpl) DDLEventCh() chan *notifier.SchemaChangeEvent { + return h.ddlEventCh } // UpdateStatsWithCountDeltaAndModifyCountDeltaForTest updates the global stats with the given count delta and modify count delta. @@ -292,27 +148,3 @@ func updateStatsWithCountDeltaAndModifyCountDelta( return err } - -func (h *ddlHandlerImpl) getTableIDs(tblInfo *model.TableInfo) (ids []int64, err error) { - pi := tblInfo.GetPartitionInfo() - if pi == nil { - return []int64{tblInfo.ID}, nil - } - ids = make([]int64, 0, len(pi.Definitions)+1) - for _, def := range pi.Definitions { - ids = append(ids, def.ID) - } - pruneMode, err := util.GetCurrentPruneMode(h.statsHandler.SPool()) - if err != nil { - return nil, err - } - if variable.PartitionPruneMode(pruneMode) == variable.Dynamic { - ids = append(ids, tblInfo.ID) - } - return ids, nil -} - -// DDLEventCh returns ddl events channel in handle. -func (h *ddlHandlerImpl) DDLEventCh() chan *notifier.SchemaChangeEvent { - return h.ddlEventCh -} diff --git a/pkg/statistics/handle/ddl/ddl_test.go b/pkg/statistics/handle/ddl/ddl_test.go index 5cfe11a0968bd..aa3e88fd49b19 100644 --- a/pkg/statistics/handle/ddl/ddl_test.go +++ b/pkg/statistics/handle/ddl/ddl_test.go @@ -24,6 +24,7 @@ import ( pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/planner/cardinality" "github.com/pingcap/tidb/pkg/statistics/handle/ddl" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/storage" statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" @@ -76,7 +77,7 @@ func TestDDLTable(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := do.StatsHandle() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) statsTbl := h.GetTableStats(tableInfo) @@ -87,7 +88,7 @@ func TestDDLTable(t *testing.T) { tbl, err = is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t1")) require.NoError(t, err) tableInfo = tbl.Meta() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) statsTbl = h.GetTableStats(tableInfo) @@ -100,7 +101,7 @@ func TestDDLTable(t *testing.T) { tbl, err = is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t_parent")) require.NoError(t, err) tableInfo = tbl.Meta() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) statsTbl = h.GetTableStats(tableInfo) @@ -111,7 +112,7 @@ func TestDDLTable(t *testing.T) { tbl, err = is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t_child")) require.NoError(t, err) tableInfo = tbl.Meta() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) statsTbl = h.GetTableStats(tableInfo) @@ -186,7 +187,7 @@ func TestTruncateTable(t *testing.T) { // Find the truncate table partition event. truncateTableEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTable) - err = h.HandleDDLEvent(truncateTableEvent) + err = statstestutil.HandleDDLEventWithTxn(h, truncateTableEvent) require.NoError(t, err) // Get new table info. @@ -259,7 +260,7 @@ func TestTruncateAPartitionedTable(t *testing.T) { testKit.MustExec("truncate table t") // Find the truncate table event. truncateTableEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTable) - err = h.HandleDDLEvent(truncateTableEvent) + err = statstestutil.HandleDDLEventWithTxn(h, truncateTableEvent) require.NoError(t, err) // Get new table info. @@ -300,7 +301,7 @@ func TestDDLHistogram(t *testing.T) { testKit.MustExec("analyze table t") testKit.MustExec("alter table t add column c_null int") - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) is := do.InfoSchema() require.Nil(t, h.Update(context.Background(), is)) @@ -315,7 +316,7 @@ func TestDDLHistogram(t *testing.T) { require.Equal(t, int64(0), statsTbl.GetCol(tableInfo.Columns[2].ID).Histogram.NDV) testKit.MustExec("alter table t add column c3 int NOT NULL") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) is = do.InfoSchema() require.Nil(t, h.Update(context.Background(), is)) @@ -335,7 +336,7 @@ func TestDDLHistogram(t *testing.T) { require.Equal(t, float64(0), count) testKit.MustExec("alter table t add column c4 datetime NOT NULL default CURRENT_TIMESTAMP") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) is = do.InfoSchema() require.Nil(t, h.Update(context.Background(), is)) @@ -348,7 +349,7 @@ func TestDDLHistogram(t *testing.T) { require.True(t, statsTbl.ColAndIdxExistenceMap.HasAnalyzed(4, false)) testKit.MustExec("alter table t add column c5 varchar(15) DEFAULT '123'") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) is = do.InfoSchema() require.Nil(t, h.Update(context.Background(), is)) @@ -362,7 +363,7 @@ func TestDDLHistogram(t *testing.T) { require.Equal(t, 3.0, cardinality.AvgColSize(statsTbl.GetCol(tableInfo.Columns[5].ID), statsTbl.RealtimeCount, false)) testKit.MustExec("alter table t add column c6 varchar(15) DEFAULT '123', add column c7 varchar(15) DEFAULT '123'") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) is = do.InfoSchema() require.Nil(t, h.Update(context.Background(), is)) @@ -402,7 +403,7 @@ func TestDDLPartition(t *testing.T) { testKit.MustExec("drop table if exists t") h := do.StatsHandle() if i == 1 { - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) } createTable := `CREATE TABLE t (a int, b int, primary key(a), index idx(b)) @@ -417,7 +418,7 @@ PARTITION BY RANGE ( a ) ( tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) tableInfo := tbl.Meta() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) pi := tableInfo.GetPartitionInfo() @@ -429,7 +430,7 @@ PARTITION BY RANGE ( a ) ( testKit.MustExec("insert into t values (1,2),(6,2),(11,2),(16,2)") testKit.MustExec("analyze table t") testKit.MustExec("alter table t add column c varchar(15) DEFAULT '123'") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) is = do.InfoSchema() require.Nil(t, h.Update(context.Background(), is)) @@ -449,7 +450,7 @@ PARTITION BY RANGE ( a ) ( tbl, err = is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) tableInfo = tbl.Meta() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) pi = tableInfo.GetPartitionInfo() @@ -516,7 +517,7 @@ func TestReorgPartitions(t *testing.T) { testKit.MustExec("alter table t reorganize partition p0, p1 into (partition p0 values less than (11))") // Find the reorganize partition event. reorganizePartitionEvent := findEvent(h.DDLEventCh(), model.ActionReorganizePartition) - err = h.HandleDDLEvent(reorganizePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, reorganizePartitionEvent) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) @@ -568,7 +569,7 @@ func TestIncreasePartitionCountOfHashPartitionTable(t *testing.T) { testKit.MustExec("alter table t add partition partitions 2") // Find the reorganize partition event. reorganizePartitionEvent := findEvent(h.DDLEventCh(), model.ActionReorganizePartition) - err = h.HandleDDLEvent(reorganizePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, reorganizePartitionEvent) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) @@ -642,7 +643,7 @@ func TestDecreasePartitionCountOfHashPartitionTable(t *testing.T) { testKit.MustExec("alter table t coalesce partition 2") // Find the reorganize partition event. reorganizePartitionEvent := findEvent(h.DDLEventCh(), model.ActionReorganizePartition) - err = h.HandleDDLEvent(reorganizePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, reorganizePartitionEvent) require.NoError(t, err) require.Nil(t, h.Update(context.Background(), is)) @@ -722,7 +723,7 @@ func TestTruncateAPartition(t *testing.T) { testKit.MustExec("alter table t truncate partition p0") // Find the truncate partition event. truncatePartitionEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTablePartition) - err = h.HandleDDLEvent(truncatePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, truncatePartitionEvent) require.NoError(t, err) // Check global stats meta. // Because we have truncated a partition, the count should be 5 - 2 = 3 and the modify count should be 2. @@ -785,7 +786,7 @@ func TestTruncateAHashPartition(t *testing.T) { testKit.MustExec("alter table t truncate partition p0") // Find the truncate partition event. truncatePartitionEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTablePartition) - err = h.HandleDDLEvent(truncatePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, truncatePartitionEvent) require.NoError(t, err) // Check global stats meta. // Because we have truncated a partition, the count should be 5 - 1 = 4 and the modify count should be 1. @@ -855,7 +856,7 @@ func TestTruncatePartitions(t *testing.T) { testKit.MustExec("alter table t truncate partition p0, p1") // Find the truncate partition event. truncatePartitionEvent := findEvent(h.DDLEventCh(), model.ActionTruncateTablePartition) - err = h.HandleDDLEvent(truncatePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, truncatePartitionEvent) require.NoError(t, err) // Check global stats meta. // Because we have truncated two partitions, the count should be 5 - 2 - 1 = 2 and the modify count should be 3. @@ -915,16 +916,6 @@ func TestDropAPartition(t *testing.T) { // Find the drop partition event. dropPartitionEvent := findEvent(h.DDLEventCh(), model.ActionDropTablePartition) - err = h.HandleDDLEvent(dropPartitionEvent) - require.NoError(t, err) - // Check the global stats meta. - // Because we have dropped a partition, the count should be 3 and the modify count should be 2. - testKit.MustQuery( - "select count, modify_count from mysql.stats_meta where table_id = ?", tableInfo.ID, - ).Check( - testkit.Rows("3 2"), - ) - // Get partition p0's stats update version. partitionID := pi.Definitions[0].ID // Get it from stats_meta first. @@ -934,6 +925,16 @@ func TestDropAPartition(t *testing.T) { require.Len(t, rows, 1) version := rows[0][0].(string) + err = statstestutil.HandleDDLEventWithTxn(h, dropPartitionEvent) + require.NoError(t, err) + // Check the global stats meta. + // Because we have dropped a partition, the count should be 3 and the modify count should be 2. + testKit.MustQuery( + "select count, modify_count from mysql.stats_meta where table_id = ?", tableInfo.ID, + ).Check( + testkit.Rows("3 2"), + ) + // Check the update version is changed. rows = testKit.MustQuery( "select version from mysql.stats_meta where table_id = ?", tableInfo.ID, @@ -995,7 +996,7 @@ func TestDropPartitions(t *testing.T) { // Find the drop partition event. dropPartitionEvent := findEvent(h.DDLEventCh(), model.ActionDropTablePartition) - err = h.HandleDDLEvent(dropPartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, dropPartitionEvent) require.NoError(t, err) // Check the global stats meta. @@ -1082,7 +1083,7 @@ func TestExchangeAPartition(t *testing.T) { testKit.MustExec("alter table t exchange partition p0 with table t1") // Find the exchange partition event. exchangePartitionEvent := findEvent(h.DDLEventCh(), model.ActionExchangeTablePartition) - err = h.HandleDDLEvent(exchangePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, exchangePartitionEvent) require.NoError(t, err) // Check the global stats meta. // Because we have exchanged a partition, the count should be 5 and the modify count should be 5(table) + 2(partition). @@ -1123,7 +1124,7 @@ func TestExchangeAPartition(t *testing.T) { testKit.MustExec("alter table t exchange partition p1 with table t2") // Find the exchange partition event. exchangePartitionEvent = findEvent(h.DDLEventCh(), model.ActionExchangeTablePartition) - err = h.HandleDDLEvent(exchangePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, exchangePartitionEvent) require.NoError(t, err) // Check the global stats meta. testKit.MustQuery( @@ -1158,7 +1159,7 @@ func TestExchangeAPartition(t *testing.T) { testKit.MustExec(fmt.Sprintf("delete from mysql.stats_meta where table_id = %d", tableInfo.ID)) // Find the exchange partition event. exchangePartitionEvent = findEvent(h.DDLEventCh(), model.ActionExchangeTablePartition) - err = h.HandleDDLEvent(exchangePartitionEvent) + err = statstestutil.HandleDDLEventWithTxn(h, exchangePartitionEvent) require.NoError(t, err) // Check the global stats meta. testKit.MustQuery( @@ -1226,7 +1227,7 @@ func TestRemovePartitioning(t *testing.T) { testKit.MustExec("alter table t remove partitioning") // Find the remove partitioning event. removePartitioningEvent := findEvent(h.DDLEventCh(), model.ActionRemovePartitioning) - err = h.HandleDDLEvent(removePartitioningEvent) + err = statstestutil.HandleDDLEventWithTxn(h, removePartitioningEvent) require.NoError(t, err) // Check the global stats meta make sure the count and modify count are not changed. // Get new table id after remove partitioning. @@ -1289,7 +1290,7 @@ func TestAddPartitioning(t *testing.T) { testKit.MustExec("alter table t partition by hash(a) partitions 3") // Find the add partitioning event. addPartitioningEvent := findEvent(h.DDLEventCh(), model.ActionAlterTablePartitioning) - err = h.HandleDDLEvent(addPartitioningEvent) + err = statstestutil.HandleDDLEventWithTxn(h, addPartitioningEvent) require.NoError(t, err) // Check the global stats meta make sure the count and modify count are not changed. // Get new table id after remove partitioning. @@ -1332,7 +1333,7 @@ func TestDropSchema(t *testing.T) { // Handle the drop schema event. dropSchemaEvent := findEvent(h.DDLEventCh(), model.ActionDropSchema) - err = h.HandleDDLEvent(dropSchemaEvent) + err = statstestutil.HandleDDLEventWithTxn(h, dropSchemaEvent) require.NoError(t, err) // Check the stats meta version after drop schema. diff --git a/pkg/statistics/handle/ddl/drop_partition.go b/pkg/statistics/handle/ddl/drop_partition.go deleted file mode 100644 index 439b65c1b9469..0000000000000 --- a/pkg/statistics/handle/ddl/drop_partition.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "github.com/pingcap/tidb/pkg/ddl/notifier" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/statistics/handle/util" -) - -func (h *ddlHandlerImpl) onDropPartitions(t *notifier.SchemaChangeEvent) error { - globalTableInfo, droppedPartitionInfo := t.GetDropPartitionInfo() - // Note: Put all the operations in a transaction. - if err := util.CallWithSCtx(h.statsHandler.SPool(), func(sctx sessionctx.Context) error { - return updateGlobalTableStats4DropPartition(util.StatsCtx, sctx, globalTableInfo, droppedPartitionInfo) - }, util.FlagWrapTxn); err != nil { - return err - } - - // Reset the partition stats. - // It's OK to put those operations in different transactions. Because it will not affect the correctness. - for _, def := range droppedPartitionInfo.Definitions { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(def.ID); err != nil { - return err - } - } - - return nil -} diff --git a/pkg/statistics/handle/ddl/exchange_partition.go b/pkg/statistics/handle/ddl/exchange_partition.go deleted file mode 100644 index 1b8488b9eb4c9..0000000000000 --- a/pkg/statistics/handle/ddl/exchange_partition.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "github.com/pingcap/tidb/pkg/ddl/notifier" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/statistics/handle/util" -) - -func (h *ddlHandlerImpl) onExchangeAPartition(t *notifier.SchemaChangeEvent) error { - globalTableInfo, originalPartInfo, - originalTableInfo := t.GetExchangePartitionInfo() - // Note: Put all the operations in a transaction. - return util.CallWithSCtx(h.statsHandler.SPool(), func(sctx sessionctx.Context) error { - return updateGlobalTableStats4ExchangePartition( - util.StatsCtx, - sctx, - globalTableInfo, - originalPartInfo, - originalTableInfo, - ) - }, util.FlagWrapTxn) -} diff --git a/pkg/statistics/handle/ddl/reorganize_partition.go b/pkg/statistics/handle/ddl/reorganize_partition.go deleted file mode 100644 index 4d039fc0e3277..0000000000000 --- a/pkg/statistics/handle/ddl/reorganize_partition.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "github.com/pingcap/tidb/pkg/ddl/notifier" -) - -func (h *ddlHandlerImpl) onReorganizePartitions(t *notifier.SchemaChangeEvent) error { - globalTableInfo, - addedPartInfo, - droppedPartitionInfo := t.GetReorganizePartitionInfo() - // Avoid updating global stats as the data remains unchanged. - // For new partitions, it's crucial to correctly insert the count and modify count correctly. - // However, this is challenging due to the need to know the count of the new partitions. - // Given that a partition can be split into two, determining the count of the new partitions is so hard. - // It's acceptable to not update it immediately, - // as the new partitions will be analyzed shortly due to the absence of statistics for them. - // Therefore, the auto-analyze worker will handle them in the near future. - for _, def := range addedPartInfo.Definitions { - if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil { - return err - } - } - - // Reset the partition stats. - // It's OK to put those operations in different transactions. Because it will not affect the correctness. - for _, def := range droppedPartitionInfo.Definitions { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(def.ID); err != nil { - return err - } - } - - return nil -} diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go index 2004c5f83ca2d..a4af15a1815d0 100644 --- a/pkg/statistics/handle/ddl/subscriber.go +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -37,8 +37,8 @@ type subscriber struct { statsCache types.StatsCache } -// NewSubscriber creates a new subscriber. -func NewSubscriber( +// newSubscriber creates a new subscriber. +func newSubscriber( statsCache types.StatsCache, ) *subscriber { h := subscriber{statsCache: statsCache} @@ -174,6 +174,16 @@ func (h subscriber) handle( } return nil + // EXCHANGE PARTITION EVENT NOTES: + // 1. When a partition is exchanged with a system table, we need to adjust the global statistics + // based on the count delta and modify count delta. However, due to the involvement of the system table, + // a complete update of the global statistics is not feasible. Therefore, we bypass the statistics update + // for the table in this scenario. Despite this, the table id still changes, so the statistics for the + // system table will still be visible. + // 2. If the system table is a partitioned table, we will update the global statistics for the partitioned table. + // It is rare to exchange a partition from a system table, so we can ignore this case. In this case, + // the system table will have statistics, but this is not a significant issue. + // So we decided to completely ignore the system table event. case model.ActionExchangeTablePartition: globalTableInfo, originalPartInfo, originalTableInfo := change.GetExchangePartitionInfo() return errors.Trace(updateGlobalTableStats4ExchangePartition( diff --git a/pkg/statistics/handle/ddl/testutil/BUILD.bazel b/pkg/statistics/handle/ddl/testutil/BUILD.bazel new file mode 100644 index 0000000000000..2ec35ac66d0e1 --- /dev/null +++ b/pkg/statistics/handle/ddl/testutil/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "testutil", + srcs = ["util.go"], + importpath = "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil", + visibility = ["//visibility:public"], + deps = [ + "//pkg/ddl/notifier", + "//pkg/kv", + "//pkg/sessionctx", + "//pkg/statistics/handle", + "//pkg/statistics/handle/util", + ], +) diff --git a/pkg/statistics/handle/ddl/testutil/util.go b/pkg/statistics/handle/ddl/testutil/util.go new file mode 100644 index 0000000000000..2900bcc7769c5 --- /dev/null +++ b/pkg/statistics/handle/ddl/testutil/util.go @@ -0,0 +1,38 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutil + +import ( + "context" + + "github.com/pingcap/tidb/pkg/ddl/notifier" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/statistics/handle" + statsutil "github.com/pingcap/tidb/pkg/statistics/handle/util" +) + +// HandleDDLEventWithTxn wraps the common pattern of handling DDL events with a transaction +func HandleDDLEventWithTxn(h *handle.Handle, event *notifier.SchemaChangeEvent) error { + return statsutil.CallWithSCtx(h.SPool(), func(sctx sessionctx.Context) error { + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalDDLNotifier) + return h.HandleDDLEvent(ctx, sctx, event) + }, statsutil.FlagWrapTxn) +} + +// HandleNextDDLEventWithTxn handles the next DDL event from the channel with a transaction +func HandleNextDDLEventWithTxn(h *handle.Handle) error { + return HandleDDLEventWithTxn(h, <-h.DDLEventCh()) +} diff --git a/pkg/statistics/handle/ddl/truncate_partition.go b/pkg/statistics/handle/ddl/truncate_partition.go deleted file mode 100644 index f3708167e6f1d..0000000000000 --- a/pkg/statistics/handle/ddl/truncate_partition.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "github.com/pingcap/tidb/pkg/ddl/notifier" - "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/statistics/handle/util" -) - -func (h *ddlHandlerImpl) onTruncatePartitions(t *notifier.SchemaChangeEvent) error { - globalTableInfo, addedPartInfo, droppedPartInfo := t.GetTruncatePartitionInfo() - // First, add the new stats meta record for the new partitions. - for _, def := range addedPartInfo.Definitions { - if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil { - return err - } - } - - // Second, clean up the old stats meta from global stats meta for the dropped partitions. - // Do not forget to put those operations in one transaction. - if err := util.CallWithSCtx(h.statsHandler.SPool(), func(sctx sessionctx.Context) error { - return updateGlobalTableStats4TruncatePartition( - util.StatsCtx, - sctx, - globalTableInfo, - droppedPartInfo, - ) - }, util.FlagWrapTxn); err != nil { - return err - } - - // Third, clean up the old stats meta from partition stats meta for the dropped partitions. - // It's OK to put those operations in different transactions. Because it will not affect the correctness. - for _, def := range droppedPartInfo.Definitions { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(def.ID); err != nil { - return err - } - } - - return nil -} diff --git a/pkg/statistics/handle/globalstats/BUILD.bazel b/pkg/statistics/handle/globalstats/BUILD.bazel index c6b0b1abdc330..b02d0225b47ae 100644 --- a/pkg/statistics/handle/globalstats/BUILD.bazel +++ b/pkg/statistics/handle/globalstats/BUILD.bazel @@ -54,6 +54,7 @@ go_test( "//pkg/session", "//pkg/sessionctx/stmtctx", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/testkit", "//pkg/testkit/testsetup", "//pkg/types", diff --git a/pkg/statistics/handle/globalstats/global_stats_test.go b/pkg/statistics/handle/globalstats/global_stats_test.go index be8435ba0a647..4564af9fa24ea 100644 --- a/pkg/statistics/handle/globalstats/global_stats_test.go +++ b/pkg/statistics/handle/globalstats/global_stats_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/session" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" ) @@ -511,7 +512,8 @@ partition by range (a) ( partition p0 values less than (10), partition p1 values less than (20) )`) - require.NoError(t, dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) + require.NoError(t, err) tk.MustExec("insert into t values (1), (5), (null), (11), (15)") require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) @@ -522,7 +524,7 @@ partition by range (a) ( tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") tk.MustExec("set @@session.tidb_analyze_version=1") - err := tk.ExecToErr("analyze table t") // try to build global-stats on ver1 + err = tk.ExecToErr("analyze table t") // try to build global-stats on ver1 require.NoError(t, err) tk.MustExec("set @@tidb_partition_prune_mode='dynamic'") @@ -585,7 +587,7 @@ func TestDDLPartition4GlobalStats(t *testing.T) { do := dom is := do.InfoSchema() h := do.StatsHandle() - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.Update(context.Background(), is)) tk.MustExec("insert into t values (1), (2), (3), (4), (5), " + @@ -604,7 +606,8 @@ func TestDDLPartition4GlobalStats(t *testing.T) { tk.MustExec("alter table t truncate partition p2, p4;") require.NoError(t, h.DumpStatsDeltaToKV(true)) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) require.NoError(t, h.Update(context.Background(), is)) // We will update the global-stats after the truncate operation. globalStats = h.GetTableStats(tableInfo) @@ -865,7 +868,8 @@ func TestGlobalIndexStatistics(t *testing.T) { // analyze table t tk.MustExec("drop table if exists t") if i != 0 { - require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) } tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, key(a) )" + "PARTITION BY RANGE (a) (" + @@ -873,7 +877,8 @@ func TestGlobalIndexStatistics(t *testing.T) { "PARTITION p1 VALUES LESS THAN (20)," + "PARTITION p2 VALUES LESS THAN (30)," + "PARTITION p3 VALUES LESS THAN (40))") - require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t(a,b) values (1,1), (2,2), (3,3), (15,15), (25,25), (35,35)") tk.MustExec("ALTER TABLE t ADD UNIQUE INDEX idx(b) GLOBAL") <-h.DDLEventCh() @@ -888,14 +893,16 @@ func TestGlobalIndexStatistics(t *testing.T) { // analyze table t index idx tk.MustExec("drop table if exists t") - require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, primary key(b, a) clustered)" + "PARTITION BY RANGE (a) (" + "PARTITION p0 VALUES LESS THAN (10)," + "PARTITION p1 VALUES LESS THAN (20)," + "PARTITION p2 VALUES LESS THAN (30)," + "PARTITION p3 VALUES LESS THAN (40));") - require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t(a,b) values (1,1), (2,2), (3,3), (15,15), (25,25), (35,35)") tk.MustExec("ALTER TABLE t ADD UNIQUE INDEX idx(b) GLOBAL") <-h.DDLEventCh() @@ -907,14 +914,16 @@ func TestGlobalIndexStatistics(t *testing.T) { // analyze table t index tk.MustExec("drop table if exists t") - require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("CREATE TABLE t ( a int, b int, c int default 0, primary key(b, a) clustered )" + "PARTITION BY RANGE (a) (" + "PARTITION p0 VALUES LESS THAN (10)," + "PARTITION p1 VALUES LESS THAN (20)," + "PARTITION p2 VALUES LESS THAN (30)," + "PARTITION p3 VALUES LESS THAN (40));") - require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t(a,b) values (1,1), (2,2), (3,3), (15,15), (25,25), (35,35)") tk.MustExec("ALTER TABLE t ADD UNIQUE INDEX idx(b) GLOBAL") <-h.DDLEventCh() diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index d986dfdafba22..9d63a732bce55 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/usage" "github.com/pingcap/tidb/pkg/statistics/handle/util" pkgutil "github.com/pingcap/tidb/pkg/util" + "github.com/pingcap/tidb/pkg/util/intest" "go.uber.org/zap" ) @@ -146,6 +147,14 @@ func NewHandle( handle.StatsReadWriter, handle, ) + if ddlNotifier != nil { + // In test environments, we use a channel-based approach to handle DDL events. + // This maintains compatibility with existing test cases that expect events to be delivered through channels. + // In production, DDL events are handled by the notifier system instead. + if !intest.InTest { + ddlNotifier.RegisterHandler(notifier.StatsMetaHandlerID, handle.DDL.HandleDDLEvent) + } + } return handle, nil } @@ -197,12 +206,6 @@ func (h *Handle) getPartitionStats(tblInfo *model.TableInfo, pid int64, returnPs // FlushStats flushes the cached stats update into store. func (h *Handle) FlushStats() { - for len(h.DDLEventCh()) > 0 { - e := <-h.DDLEventCh() - if err := h.HandleDDLEvent(e); err != nil { - statslogutil.StatsLogger().Error("handle ddl event fail", zap.Error(err)) - } - } if err := h.DumpStatsDeltaToKV(true); err != nil { statslogutil.StatsLogger().Error("dump stats delta fail", zap.Error(err)) } diff --git a/pkg/statistics/handle/handletest/BUILD.bazel b/pkg/statistics/handle/handletest/BUILD.bazel index 2884f52b182f1..1928e1250a185 100644 --- a/pkg/statistics/handle/handletest/BUILD.bazel +++ b/pkg/statistics/handle/handletest/BUILD.bazel @@ -18,6 +18,7 @@ go_test( "//pkg/sessionctx/variable", "//pkg/statistics", "//pkg/statistics/handle", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/util", "//pkg/testkit", "//pkg/testkit/analyzehelper", diff --git a/pkg/statistics/handle/handletest/handle_test.go b/pkg/statistics/handle/handletest/handle_test.go index 2fea656d8ae92..84563dec1fc33 100644 --- a/pkg/statistics/handle/handletest/handle_test.go +++ b/pkg/statistics/handle/handletest/handle_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/analyzehelper" @@ -210,7 +211,7 @@ func TestLoadHist(t *testing.T) { testKit.MustExec("create table t (c1 varchar(12), c2 char(12))") do := dom h := do.StatsHandle() - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) rowCount := 10 for i := 0; i < rowCount; i++ { @@ -246,7 +247,7 @@ func TestLoadHist(t *testing.T) { }) // Add column c3, we only update c3. testKit.MustExec("alter table t add column c3 int") - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) is = do.InfoSchema() tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) @@ -1176,7 +1177,7 @@ func testIncrementalModifyCountUpdateHelper(analyzeSnapshot bool) func(*testing. analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a") tk.MustExec("set @@session.tidb_analyze_version = 2") h := dom.StatsHandle() - err := h.HandleDDLEvent(<-h.DDLEventCh()) + err := statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) tbl, err := dom.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) @@ -1304,7 +1305,8 @@ func TestUninitializedStatsStatus(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("create table t(a int, b int, c int, index idx_a(a))") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t values (1,2,2), (3,4,4), (5,6,6), (7,8,8), (9,10,10)") require.NoError(t, h.DumpStatsDeltaToKV(true)) is := dom.InfoSchema() diff --git a/pkg/statistics/handle/handletest/statstest/BUILD.bazel b/pkg/statistics/handle/handletest/statstest/BUILD.bazel index 36c52aa38cf04..096e8ebac5d22 100644 --- a/pkg/statistics/handle/handletest/statstest/BUILD.bazel +++ b/pkg/statistics/handle/handletest/statstest/BUILD.bazel @@ -15,6 +15,7 @@ go_test( "//pkg/parser/model", "//pkg/parser/mysql", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/internal", "//pkg/testkit", "//pkg/testkit/analyzehelper", diff --git a/pkg/statistics/handle/handletest/statstest/stats_test.go b/pkg/statistics/handle/handletest/statstest/stats_test.go index 728f89a69e07e..9fbde012bd3f1 100644 --- a/pkg/statistics/handle/handletest/statstest/stats_test.go +++ b/pkg/statistics/handle/handletest/statstest/stats_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/statistics" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/internal" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/analyzehelper" @@ -387,13 +388,15 @@ func initStatsVer2(t *testing.T) { tk.MustExec("use test") tk.MustExec("set @@session.tidb_analyze_version=2") tk.MustExec("create table t(a int, b int, c int, d int, index idx(a), index idxab(a, b))") - dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh()) + h := dom.StatsHandle() + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "c") tk.MustExec("insert into t values(1, 1, 1, 1), (2, 2, 2, 2), (3, 3, 3, 3), (4, 4, 4, 4), (4, 4, 4, 4), (4, 4, 4, 4)") tk.MustExec("analyze table t with 2 topn, 3 buckets") tk.MustExec("alter table t add column e int default 1") - dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh()) - h := dom.StatsHandle() + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) is := dom.InfoSchema() tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) diff --git a/pkg/statistics/handle/storage/BUILD.bazel b/pkg/statistics/handle/storage/BUILD.bazel index 72412b217a56d..72070ce6e1554 100644 --- a/pkg/statistics/handle/storage/BUILD.bazel +++ b/pkg/statistics/handle/storage/BUILD.bazel @@ -68,6 +68,7 @@ go_test( "//pkg/planner/cardinality", "//pkg/sessionctx/variable", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/internal", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", diff --git a/pkg/statistics/handle/storage/dump_test.go b/pkg/statistics/handle/storage/dump_test.go index 4c42b8ebb8f52..5a8dcb65fbc05 100644 --- a/pkg/statistics/handle/storage/dump_test.go +++ b/pkg/statistics/handle/storage/dump_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/statistics" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/internal" "github.com/pingcap/tidb/pkg/statistics/handle/storage" statstypes "github.com/pingcap/tidb/pkg/statistics/handle/types" @@ -633,7 +634,8 @@ func TestLoadStatsFromOldVersion(t *testing.T) { tk.MustExec("create table t(a int, b int, index idx(b))") h := dom.StatsHandle() is := dom.InfoSchema() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) require.NoError(t, h.Update(context.Background(), is)) statsJSONFromOldVersion := `{ diff --git a/pkg/statistics/handle/storage/gc_test.go b/pkg/statistics/handle/storage/gc_test.go index 3c480100e54fb..b82e5dceb8d95 100644 --- a/pkg/statistics/handle/storage/gc_test.go +++ b/pkg/statistics/handle/storage/gc_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx/variable" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/analyzehelper" "github.com/stretchr/testify/require" @@ -107,7 +108,8 @@ func TestGCExtendedStats(t *testing.T) { testKit.MustExec("alter table t add stats_extended s1 correlation(a,b)") testKit.MustExec("alter table t add stats_extended s2 correlation(b,c)") h := dom.StatsHandle() - require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) testKit.MustExec("analyze table t") testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( @@ -134,7 +136,8 @@ func TestGCExtendedStats(t *testing.T) { testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( "s2 2 [2,3] 1.000000 1", )) - require.Nil(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) require.Nil(t, h.GCStats(dom.InfoSchema(), ddlLease)) testKit.MustQuery("select name, type, column_ids, stats, status from mysql.stats_extended").Sort().Check(testkit.Rows( "s2 2 [2,3] 1.000000 2", diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index e220c1aadfeec..04148626bece3 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -43,6 +43,7 @@ go_test( "//pkg/parser/model", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/types", "//pkg/testkit", "//pkg/testkit/analyzehelper", diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index e7860270130ea..dec30cd82bb3f 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -25,6 +25,7 @@ import ( pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "github.com/pingcap/tidb/pkg/statistics/handle/types" "github.com/pingcap/tidb/pkg/testkit" @@ -435,7 +436,8 @@ func TestSyncLoadOnObjectWhichCanNotFoundInStorage(t *testing.T) { // Do some DDL, one successfully handled by handleDDLEvent, the other not. tk.MustExec("alter table t add column d int default 2") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) tbl, err = dom.InfoSchema().TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t")) require.NoError(t, err) diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 6cd80e3434731..df66858b9ebf7 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -519,7 +519,7 @@ type StatsGlobal interface { // DDL is used to handle ddl events. type DDL interface { // HandleDDLEvent handles ddl events. - HandleDDLEvent(changeEvent *notifier.SchemaChangeEvent) error + HandleDDLEvent(ctx context.Context, sctx sessionctx.Context, changeEvent *notifier.SchemaChangeEvent) error // DDLEventCh returns ddl events channel in handle. DDLEventCh() chan *notifier.SchemaChangeEvent } diff --git a/pkg/statistics/handle/updatetest/BUILD.bazel b/pkg/statistics/handle/updatetest/BUILD.bazel index b204b0035b10b..73f1c44ff8b50 100644 --- a/pkg/statistics/handle/updatetest/BUILD.bazel +++ b/pkg/statistics/handle/updatetest/BUILD.bazel @@ -17,6 +17,7 @@ go_test( "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/usage", "//pkg/statistics/handle/util", "//pkg/testkit", diff --git a/pkg/statistics/handle/updatetest/update_test.go b/pkg/statistics/handle/updatetest/update_test.go index 2b3766d6d47b6..76f20d0e83f85 100644 --- a/pkg/statistics/handle/updatetest/update_test.go +++ b/pkg/statistics/handle/updatetest/update_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/usage" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" @@ -63,10 +64,9 @@ func TestSingleSessionInsert(t *testing.T) { require.NoError(t, err) tableInfo1 := tbl1.Meta() h := dom.StatsHandle() - - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -181,7 +181,7 @@ func TestRollback(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := dom.StatsHandle() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) @@ -215,8 +215,7 @@ func TestMultiSession(t *testing.T) { require.NoError(t, err) tableInfo1 := tbl1.Meta() h := dom.StatsHandle() - - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -259,8 +258,7 @@ func TestTxnWithFailure(t *testing.T) { require.NoError(t, err) tableInfo1 := tbl1.Meta() h := dom.StatsHandle() - - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) rowCount1 := 10 @@ -313,7 +311,7 @@ func TestUpdatePartition(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := do.StatsHandle() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) pi := tableInfo.GetPartitionInfo() require.Len(t, pi.Definitions, 2) @@ -379,8 +377,7 @@ func TestAutoUpdate(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := do.StatsHandle() - - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.Update(context.Background(), is)) stats := h.GetTableStats(tableInfo) @@ -601,7 +598,7 @@ func TestOutOfOrderUpdate(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := do.StatsHandle() - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) // Simulate the case that another tidb has inserted some value, but delta info has not been dumped to kv yet. @@ -799,7 +796,8 @@ func TestAutoUpdatePartitionInDynamicOnlyMode(t *testing.T) { do := dom is := do.InfoSchema() h := do.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) testKit.MustExec("insert into t values (1, 'a'), (2, 'b'), (11, 'c'), (12, 'd'), (21, 'e'), (22, 'f')") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -862,7 +860,8 @@ func TestAutoAnalyzeRatio(t *testing.T) { h := dom.StatsHandle() tk.MustExec("use test") tk.MustExec("create table t (a int, index idx(a))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t values (1)" + strings.Repeat(", (1)", 19)) require.NoError(t, h.DumpStatsDeltaToKV(true)) is := dom.InfoSchema() @@ -1064,7 +1063,8 @@ func TestStatsLockUnlockForAutoAnalyze(t *testing.T) { h := dom.StatsHandle() tk.MustExec("use test") tk.MustExec("create table t (a int, index idx(a))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t values (1)" + strings.Repeat(", (1)", 19)) require.NoError(t, h.DumpStatsDeltaToKV(true)) is := dom.InfoSchema() @@ -1139,9 +1139,9 @@ func TestStatsLockForDelta(t *testing.T) { testKit.MustExec("insert into t2 values(1, 2)") } - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) - err = h.HandleDDLEvent(<-h.DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(h) require.NoError(t, err) require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1240,7 +1240,8 @@ func TestNotDumpSysTable(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t1 (a int, b int)") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustQuery("select count(1) from mysql.stats_meta").Check(testkit.Rows("1")) // After executing `delete from mysql.stats_meta`, a delta for mysql.stats_meta is created but it would not be dumped. tk.MustExec("delete from mysql.stats_meta") @@ -1271,7 +1272,7 @@ func TestAutoAnalyzePartitionTableAfterAddingIndex(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t (a int, b int) partition by range (a) (PARTITION p0 VALUES LESS THAN (10), PARTITION p1 VALUES LESS THAN MAXVALUE)") h := dom.StatsHandle() - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + require.NoError(t, statstestutil.HandleNextDDLEventWithTxn(h)) tk.MustExec("insert into t values (1,2), (3,4), (11,12),(13,14)") tk.MustExec("set session tidb_analyze_version = 2") tk.MustExec("set session tidb_partition_prune_mode = 'dynamic'") diff --git a/pkg/statistics/integration_test.go b/pkg/statistics/integration_test.go index 6f498c125ec60..45bbe55950e6d 100644 --- a/pkg/statistics/integration_test.go +++ b/pkg/statistics/integration_test.go @@ -27,6 +27,7 @@ import ( metamodel "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/statistics" + statstestutil "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/analyzehelper" "github.com/pingcap/tidb/pkg/testkit/testdata" @@ -344,7 +345,8 @@ func TestOutdatedStatsCheck(t *testing.T) { h := dom.StatsHandle() tk.MustExec("use test") tk.MustExec("create table t (a int)") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t values (1)" + strings.Repeat(", (1)", 19)) // 20 rows analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -407,7 +409,8 @@ func TestShowHistogramsLoadStatus(t *testing.T) { defer func() { h.SetLease(origLease) }() tk.MustExec("use test") tk.MustExec("create table t(a int primary key, b int, c int, index idx(b, c))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t values (1,2,3), (4,5,6)") require.NoError(t, h.DumpStatsDeltaToKV(true)) tk.MustExec("analyze table t") @@ -424,7 +427,8 @@ func TestSingleColumnIndexNDV(t *testing.T) { h := dom.StatsHandle() tk.MustExec("use test") tk.MustExec("create table t(a int, b int, c varchar(20), d varchar(20), index idx_a(a), index idx_b(b), index idx_c(c), index idx_d(d))") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t values (1, 1, 'xxx', 'zzz'), (2, 2, 'yyy', 'zzz'), (1, 3, null, 'zzz')") for i := 0; i < 5; i++ { tk.MustExec("insert into t select * from t") @@ -453,7 +457,8 @@ func TestColumnStatsLazyLoad(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t(a int, b int)") tk.MustExec("insert into t values (1,2), (3,4), (5,6), (7,8)") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) analyzehelper.TriggerPredicateColumnsCollection(t, tk, store, "t", "a", "b") tk.MustExec("analyze table t") is := dom.InfoSchema() @@ -476,7 +481,8 @@ func TestUpdateNotLoadIndexFMSketch(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t(a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (10),partition p1 values less than maxvalue)") tk.MustExec("insert into t values (1,2), (3,4), (5,6), (7,8)") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("analyze table t") is := dom.InfoSchema() tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) @@ -499,7 +505,8 @@ func TestIssue44369(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t(a int, b int, index iab(a,b));") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) tk.MustExec("insert into t value(1,1);") require.NoError(t, h.DumpStatsDeltaToKV(true)) tk.MustExec("analyze table t;") @@ -517,7 +524,8 @@ func TestTableLastAnalyzeVersion(t *testing.T) { // Only create table should not set the last_analyze_version tk.MustExec("use test") tk.MustExec("create table t(a int);") - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) is := dom.InfoSchema() require.NoError(t, h.Update(context.Background(), is)) tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) @@ -531,7 +539,8 @@ func TestTableLastAnalyzeVersion(t *testing.T) { is = dom.InfoSchema() tbl, err = is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) - require.NoError(t, h.HandleDDLEvent(<-h.DDLEventCh())) + err = statstestutil.HandleNextDDLEventWithTxn(h) + require.NoError(t, err) require.NoError(t, h.Update(context.Background(), is)) statsTbl, found = h.Get(tbl.Meta().ID) require.True(t, found) @@ -598,13 +607,15 @@ func TestLastAnalyzeVersionNotChangedWithAsyncStatsLoad(t *testing.T) { tk.MustExec("set @@tidb_stats_load_sync_wait = 0;") tk.MustExec("use test") tk.MustExec("create table t(a int, b int);") - require.NoError(t, dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh())) + err := statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) + require.NoError(t, err) require.NoError(t, dom.StatsHandle().Update(context.Background(), dom.InfoSchema())) tk.MustExec("insert into t values (1, 1);") - err := dom.StatsHandle().DumpStatsDeltaToKV(true) + err = dom.StatsHandle().DumpStatsDeltaToKV(true) require.NoError(t, err) tk.MustExec("alter table t add column c int default 1;") - dom.StatsHandle().HandleDDLEvent(<-dom.StatsHandle().DDLEventCh()) + err = statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) + require.NoError(t, err) tk.MustExec("select * from t where a = 1 or b = 1 or c = 1;") require.NoError(t, dom.StatsHandle().LoadNeededHistograms(dom.InfoSchema())) result := tk.MustQuery("show stats_meta where table_name = 't'")