From 1c856d89ebe281ebfbd42d6131907d4fc6dbd672 Mon Sep 17 00:00:00 2001 From: Rustin170506 Date: Mon, 2 Dec 2024 14:01:10 +0800 Subject: [PATCH] statistics: use DDL subscriber updating stats meta Signed-off-by: Rustin170506 --- pkg/domain/domain.go | 21 --- pkg/statistics/handle/ddl/ddl.go | 165 +--------------------- pkg/statistics/handle/ddl/subscriber.go | 24 ++-- pkg/statistics/handle/handle.go | 9 +- pkg/statistics/handle/types/interfaces.go | 2 +- 5 files changed, 19 insertions(+), 202 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index d12c3e95a659e..5c74f616a5671 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2398,9 +2398,6 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err // This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations. // These tasks do not interfere with or depend on the initialization process. do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker") - do.wg.Run(func() { - do.handleDDLEvent() - }, "handleDDLEvent") // Wait for the stats worker to finish the initialization. // Otherwise, we may start the auto analyze worker before the stats cache is initialized. do.wg.Run( @@ -2599,24 +2596,6 @@ func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) } } -func (do *Domain) handleDDLEvent() { - logutil.BgLogger().Info("handleDDLEvent started.") - defer util.Recover(metrics.LabelDomain, "handleDDLEvent", nil, false) - statsHandle := do.StatsHandle() - for { - select { - case <-do.exit: - return - // This channel is sent only by ddl owner. - case t := <-statsHandle.DDLEventCh(): - err := statsHandle.HandleDDLEvent(t) - if err != nil { - logutil.BgLogger().Error("handle ddl event failed", zap.String("event", t.String()), zap.Error(err)) - } - } - } -} - func (do *Domain) updateStatsWorker(_ sessionctx.Context) { defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false) logutil.BgLogger().Info("updateStatsWorker started.") diff --git a/pkg/statistics/handle/ddl/ddl.go b/pkg/statistics/handle/ddl/ddl.go index 0df53c68661a5..f1a79e7d3f9bd 100644 --- a/pkg/statistics/handle/ddl/ddl.go +++ b/pkg/statistics/handle/ddl/ddl.go @@ -19,22 +19,18 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl/notifier" - "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/statistics/handle/lockstats" - "github.com/pingcap/tidb/pkg/statistics/handle/logutil" "github.com/pingcap/tidb/pkg/statistics/handle/storage" "github.com/pingcap/tidb/pkg/statistics/handle/types" "github.com/pingcap/tidb/pkg/statistics/handle/util" - "github.com/pingcap/tidb/pkg/util/intest" - "go.uber.org/zap" ) type ddlHandlerImpl struct { ddlEventCh chan *notifier.SchemaChangeEvent statsWriter types.StatsReadWriter statsHandler types.StatsHandle + sub *subscriber } // NewDDLHandler creates a new ddl handler. @@ -46,147 +42,13 @@ func NewDDLHandler( ddlEventCh: make(chan *notifier.SchemaChangeEvent, 1000), statsWriter: statsWriter, statsHandler: statsHandler, + sub: NewSubscriber(statsHandler), } } // HandleDDLEvent begins to process a ddl task. -func (h *ddlHandlerImpl) HandleDDLEvent(s *notifier.SchemaChangeEvent) error { - switch s.GetType() { - case model.ActionCreateTable: - newTableInfo := s.GetCreateTableInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil { - return err - } - } - case model.ActionTruncateTable: - newTableInfo, droppedTableInfo := s.GetTruncateTableInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil { - return err - } - } - - // Remove the old table stats. - droppedIDs, err := h.getTableIDs(droppedTableInfo) - if err != nil { - return err - } - for _, id := range droppedIDs { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(id); err != nil { - return err - } - } - case model.ActionDropTable: - droppedTableInfo := s.GetDropTableInfo() - ids, err := h.getTableIDs(droppedTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(id); err != nil { - return err - } - } - case model.ActionAddColumn: - newTableInfo, newColumnInfo := s.GetAddColumnInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertColStats2KV(id, newColumnInfo); err != nil { - return err - } - } - case model.ActionModifyColumn: - newTableInfo, modifiedColumnInfo := s.GetModifyColumnInfo() - ids, err := h.getTableIDs(newTableInfo) - if err != nil { - return err - } - for _, id := range ids { - if err := h.statsWriter.InsertColStats2KV(id, modifiedColumnInfo); err != nil { - return err - } - } - case model.ActionAddTablePartition: - globalTableInfo, addedPartitionInfo := s.GetAddPartitionInfo() - for _, def := range addedPartitionInfo.Definitions { - if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil { - return err - } - } - case model.ActionTruncateTablePartition: - if err := h.onTruncatePartitions(s); err != nil { - return err - } - case model.ActionDropTablePartition: - if err := h.onDropPartitions(s); err != nil { - return err - } - // EXCHANGE PARTITION EVENT NOTES: - // 1. When a partition is exchanged with a system table, we need to adjust the global statistics - // based on the count delta and modify count delta. However, due to the involvement of the system table, - // a complete update of the global statistics is not feasible. Therefore, we bypass the statistics update - // for the table in this scenario. Despite this, the table id still changes, so the statistics for the - // system table will still be visible. - // 2. If the system table is a partitioned table, we will update the global statistics for the partitioned table. - // It is rare to exchange a partition from a system table, so we can ignore this case. In this case, - // the system table will have statistics, but this is not a significant issue. - // So we decided to completely ignore the system table event. - case model.ActionExchangeTablePartition: - if err := h.onExchangeAPartition(s); err != nil { - return err - } - case model.ActionReorganizePartition: - if err := h.onReorganizePartitions(s); err != nil { - return err - } - case model.ActionAlterTablePartitioning: - oldSingleTableID, globalTableInfo, addedPartInfo := s.GetAddPartitioningInfo() - // Add new partition stats. - for _, def := range addedPartInfo.Definitions { - if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil { - return err - } - } - // Change id for global stats, since the data has not changed! - // Note: This operation will update all tables related to statistics with the new ID. - return h.statsWriter.ChangeGlobalStatsID(oldSingleTableID, globalTableInfo.ID) - case model.ActionRemovePartitioning: - // Change id for global stats, since the data has not changed! - // Note: This operation will update all tables related to statistics with the new ID. - oldTblID, newSingleTableInfo, droppedPartInfo := s.GetRemovePartitioningInfo() - if err := h.statsWriter.ChangeGlobalStatsID(oldTblID, newSingleTableInfo.ID); err != nil { - return err - } - - // Remove partition stats. - for _, def := range droppedPartInfo.Definitions { - if err := h.statsWriter.UpdateStatsMetaVersionForGC(def.ID); err != nil { - return err - } - } - case model.ActionFlashbackCluster: - return h.statsWriter.UpdateStatsVersion() - case model.ActionAddIndex: - // No need to update the stats meta for the adding index event. - case model.ActionDropSchema: - // TODO: handle the drop schema event. - default: - intest.Assert(false) - logutil.StatsLogger().Error("Unhandled schema change event", zap.Stringer("type", s)) - } - return nil +func (h *ddlHandlerImpl) HandleDDLEvent(ctx context.Context, sctx sessionctx.Context, s *notifier.SchemaChangeEvent) error { + return h.sub.handle(ctx, sctx, s) } // UpdateStatsWithCountDeltaAndModifyCountDeltaForTest updates the global stats with the given count delta and modify count delta. @@ -282,25 +144,6 @@ func updateStatsWithCountDeltaAndModifyCountDelta( return err } -func (h *ddlHandlerImpl) getTableIDs(tblInfo *model.TableInfo) (ids []int64, err error) { - pi := tblInfo.GetPartitionInfo() - if pi == nil { - return []int64{tblInfo.ID}, nil - } - ids = make([]int64, 0, len(pi.Definitions)+1) - for _, def := range pi.Definitions { - ids = append(ids, def.ID) - } - pruneMode, err := util.GetCurrentPruneMode(h.statsHandler.SPool()) - if err != nil { - return nil, err - } - if variable.PartitionPruneMode(pruneMode) == variable.Dynamic { - ids = append(ids, tblInfo.ID) - } - return ids, nil -} - // DDLEventCh returns ddl events channel in handle. func (h *ddlHandlerImpl) DDLEventCh() chan *notifier.SchemaChangeEvent { return h.ddlEventCh diff --git a/pkg/statistics/handle/ddl/subscriber.go b/pkg/statistics/handle/ddl/subscriber.go index 66fcb3a1eba37..3ad229bb23340 100644 --- a/pkg/statistics/handle/ddl/subscriber.go +++ b/pkg/statistics/handle/ddl/subscriber.go @@ -31,21 +31,19 @@ import ( "go.uber.org/zap" ) -type handler struct { +type subscriber struct { statsCache types.StatsCache } -// NewHandlerAndRegister creates a new handler and registers it to the DDL -// notifier. -func NewHandlerAndRegister( +// NewSubscriber creates a new subscriber. +func NewSubscriber( statsCache types.StatsCache, - registry *notifier.DDLNotifier, -) { - h := handler{statsCache: statsCache} - registry.RegisterHandler(notifier.StatsMetaHandlerID, h.handle) +) *subscriber { + h := subscriber{statsCache: statsCache} + return &h } -func (h handler) handle( +func (h subscriber) handle( ctx context.Context, sctx sessionctx.Context, change *notifier.SchemaChangeEvent, @@ -243,7 +241,7 @@ func (h handler) handle( return nil } -func (h handler) insertStats4PhysicalID( +func (h subscriber) insertStats4PhysicalID( ctx context.Context, sctx sessionctx.Context, info *model.TableInfo, @@ -256,7 +254,7 @@ func (h handler) insertStats4PhysicalID( return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS)) } -func (h handler) recordHistoricalStatsMeta( +func (h subscriber) recordHistoricalStatsMeta( ctx context.Context, sctx sessionctx.Context, id int64, @@ -287,7 +285,7 @@ func (h handler) recordHistoricalStatsMeta( ) } -func (h handler) delayedDeleteStats4PhysicalID( +func (h subscriber) delayedDeleteStats4PhysicalID( ctx context.Context, sctx sessionctx.Context, id int64, @@ -299,7 +297,7 @@ func (h handler) delayedDeleteStats4PhysicalID( return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS)) } -func (h handler) insertStats4Col( +func (h subscriber) insertStats4Col( ctx context.Context, sctx sessionctx.Context, physicalID int64, diff --git a/pkg/statistics/handle/handle.go b/pkg/statistics/handle/handle.go index b74ca839a1f64..b34608c9f455d 100644 --- a/pkg/statistics/handle/handle.go +++ b/pkg/statistics/handle/handle.go @@ -146,6 +146,9 @@ func NewHandle( handle.StatsReadWriter, handle, ) + if ddlNotifier != nil { + ddlNotifier.RegisterHandler(notifier.StatsMetaHandlerID, handle.DDL.HandleDDLEvent) + } return handle, nil } @@ -195,12 +198,6 @@ func (h *Handle) getPartitionStats(tblInfo *model.TableInfo, pid int64, returnPs // FlushStats flushes the cached stats update into store. func (h *Handle) FlushStats() { - for len(h.DDLEventCh()) > 0 { - e := <-h.DDLEventCh() - if err := h.HandleDDLEvent(e); err != nil { - statslogutil.StatsLogger().Error("handle ddl event fail", zap.Error(err)) - } - } if err := h.DumpStatsDeltaToKV(true); err != nil { statslogutil.StatsLogger().Error("dump stats delta fail", zap.Error(err)) } diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index 3fd8a4711fa08..d9c17951d9fa5 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -496,7 +496,7 @@ type StatsGlobal interface { // DDL is used to handle ddl events. type DDL interface { // HandleDDLEvent handles ddl events. - HandleDDLEvent(changeEvent *notifier.SchemaChangeEvent) error + HandleDDLEvent(ctx context.Context, sctx sessionctx.Context, changeEvent *notifier.SchemaChangeEvent) error // DDLEventCh returns ddl events channel in handle. DDLEventCh() chan *notifier.SchemaChangeEvent }