Skip to content

Commit

Permalink
statistics: use DDL subscriber updating stats meta
Browse files Browse the repository at this point in the history
Signed-off-by: Rustin170506 <tech@rustin.me>
  • Loading branch information
Rustin170506 committed Dec 2, 2024
1 parent 6d74071 commit 1c856d8
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 202 deletions.
21 changes: 0 additions & 21 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2398,9 +2398,6 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
// These tasks do not interfere with or depend on the initialization process.
do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker")
do.wg.Run(func() {
do.handleDDLEvent()
}, "handleDDLEvent")
// Wait for the stats worker to finish the initialization.
// Otherwise, we may start the auto analyze worker before the stats cache is initialized.
do.wg.Run(
Expand Down Expand Up @@ -2599,24 +2596,6 @@ func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle)
}
}

func (do *Domain) handleDDLEvent() {
logutil.BgLogger().Info("handleDDLEvent started.")
defer util.Recover(metrics.LabelDomain, "handleDDLEvent", nil, false)
statsHandle := do.StatsHandle()
for {
select {
case <-do.exit:
return
// This channel is sent only by ddl owner.
case t := <-statsHandle.DDLEventCh():
err := statsHandle.HandleDDLEvent(t)
if err != nil {
logutil.BgLogger().Error("handle ddl event failed", zap.String("event", t.String()), zap.Error(err))
}
}
}
}

func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
logutil.BgLogger().Info("updateStatsWorker started.")
Expand Down
165 changes: 4 additions & 161 deletions pkg/statistics/handle/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,18 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
"github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/types"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"github.com/pingcap/tidb/pkg/util/intest"
"go.uber.org/zap"
)

type ddlHandlerImpl struct {
ddlEventCh chan *notifier.SchemaChangeEvent
statsWriter types.StatsReadWriter
statsHandler types.StatsHandle
sub *subscriber
}

// NewDDLHandler creates a new ddl handler.
Expand All @@ -46,147 +42,13 @@ func NewDDLHandler(
ddlEventCh: make(chan *notifier.SchemaChangeEvent, 1000),
statsWriter: statsWriter,
statsHandler: statsHandler,
sub: NewSubscriber(statsHandler),
}
}

// HandleDDLEvent begins to process a ddl task.
func (h *ddlHandlerImpl) HandleDDLEvent(s *notifier.SchemaChangeEvent) error {
switch s.GetType() {
case model.ActionCreateTable:
newTableInfo := s.GetCreateTableInfo()
ids, err := h.getTableIDs(newTableInfo)
if err != nil {
return err
}
for _, id := range ids {
if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil {
return err
}
}
case model.ActionTruncateTable:
newTableInfo, droppedTableInfo := s.GetTruncateTableInfo()
ids, err := h.getTableIDs(newTableInfo)
if err != nil {
return err
}
for _, id := range ids {
if err := h.statsWriter.InsertTableStats2KV(newTableInfo, id); err != nil {
return err
}
}

// Remove the old table stats.
droppedIDs, err := h.getTableIDs(droppedTableInfo)
if err != nil {
return err
}
for _, id := range droppedIDs {
if err := h.statsWriter.UpdateStatsMetaVersionForGC(id); err != nil {
return err
}
}
case model.ActionDropTable:
droppedTableInfo := s.GetDropTableInfo()
ids, err := h.getTableIDs(droppedTableInfo)
if err != nil {
return err
}
for _, id := range ids {
if err := h.statsWriter.UpdateStatsMetaVersionForGC(id); err != nil {
return err
}
}
case model.ActionAddColumn:
newTableInfo, newColumnInfo := s.GetAddColumnInfo()
ids, err := h.getTableIDs(newTableInfo)
if err != nil {
return err
}
for _, id := range ids {
if err := h.statsWriter.InsertColStats2KV(id, newColumnInfo); err != nil {
return err
}
}
case model.ActionModifyColumn:
newTableInfo, modifiedColumnInfo := s.GetModifyColumnInfo()
ids, err := h.getTableIDs(newTableInfo)
if err != nil {
return err
}
for _, id := range ids {
if err := h.statsWriter.InsertColStats2KV(id, modifiedColumnInfo); err != nil {
return err
}
}
case model.ActionAddTablePartition:
globalTableInfo, addedPartitionInfo := s.GetAddPartitionInfo()
for _, def := range addedPartitionInfo.Definitions {
if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil {
return err
}
}
case model.ActionTruncateTablePartition:
if err := h.onTruncatePartitions(s); err != nil {
return err
}
case model.ActionDropTablePartition:
if err := h.onDropPartitions(s); err != nil {
return err
}
// EXCHANGE PARTITION EVENT NOTES:
// 1. When a partition is exchanged with a system table, we need to adjust the global statistics
// based on the count delta and modify count delta. However, due to the involvement of the system table,
// a complete update of the global statistics is not feasible. Therefore, we bypass the statistics update
// for the table in this scenario. Despite this, the table id still changes, so the statistics for the
// system table will still be visible.
// 2. If the system table is a partitioned table, we will update the global statistics for the partitioned table.
// It is rare to exchange a partition from a system table, so we can ignore this case. In this case,
// the system table will have statistics, but this is not a significant issue.
// So we decided to completely ignore the system table event.
case model.ActionExchangeTablePartition:
if err := h.onExchangeAPartition(s); err != nil {
return err
}
case model.ActionReorganizePartition:
if err := h.onReorganizePartitions(s); err != nil {
return err
}
case model.ActionAlterTablePartitioning:
oldSingleTableID, globalTableInfo, addedPartInfo := s.GetAddPartitioningInfo()
// Add new partition stats.
for _, def := range addedPartInfo.Definitions {
if err := h.statsWriter.InsertTableStats2KV(globalTableInfo, def.ID); err != nil {
return err
}
}
// Change id for global stats, since the data has not changed!
// Note: This operation will update all tables related to statistics with the new ID.
return h.statsWriter.ChangeGlobalStatsID(oldSingleTableID, globalTableInfo.ID)
case model.ActionRemovePartitioning:
// Change id for global stats, since the data has not changed!
// Note: This operation will update all tables related to statistics with the new ID.
oldTblID, newSingleTableInfo, droppedPartInfo := s.GetRemovePartitioningInfo()
if err := h.statsWriter.ChangeGlobalStatsID(oldTblID, newSingleTableInfo.ID); err != nil {
return err
}

// Remove partition stats.
for _, def := range droppedPartInfo.Definitions {
if err := h.statsWriter.UpdateStatsMetaVersionForGC(def.ID); err != nil {
return err
}
}
case model.ActionFlashbackCluster:
return h.statsWriter.UpdateStatsVersion()
case model.ActionAddIndex:
// No need to update the stats meta for the adding index event.
case model.ActionDropSchema:
// TODO: handle the drop schema event.
default:
intest.Assert(false)
logutil.StatsLogger().Error("Unhandled schema change event", zap.Stringer("type", s))
}
return nil
func (h *ddlHandlerImpl) HandleDDLEvent(ctx context.Context, sctx sessionctx.Context, s *notifier.SchemaChangeEvent) error {
return h.sub.handle(ctx, sctx, s)
}

// UpdateStatsWithCountDeltaAndModifyCountDeltaForTest updates the global stats with the given count delta and modify count delta.
Expand Down Expand Up @@ -282,25 +144,6 @@ func updateStatsWithCountDeltaAndModifyCountDelta(
return err
}

func (h *ddlHandlerImpl) getTableIDs(tblInfo *model.TableInfo) (ids []int64, err error) {
pi := tblInfo.GetPartitionInfo()
if pi == nil {
return []int64{tblInfo.ID}, nil
}
ids = make([]int64, 0, len(pi.Definitions)+1)
for _, def := range pi.Definitions {
ids = append(ids, def.ID)
}
pruneMode, err := util.GetCurrentPruneMode(h.statsHandler.SPool())
if err != nil {
return nil, err
}
if variable.PartitionPruneMode(pruneMode) == variable.Dynamic {
ids = append(ids, tblInfo.ID)
}
return ids, nil
}

// DDLEventCh returns ddl events channel in handle.
func (h *ddlHandlerImpl) DDLEventCh() chan *notifier.SchemaChangeEvent {
return h.ddlEventCh
Expand Down
24 changes: 11 additions & 13 deletions pkg/statistics/handle/ddl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,19 @@ import (
"go.uber.org/zap"
)

type handler struct {
type subscriber struct {
statsCache types.StatsCache
}

// NewHandlerAndRegister creates a new handler and registers it to the DDL
// notifier.
func NewHandlerAndRegister(
// NewSubscriber creates a new subscriber.
func NewSubscriber(
statsCache types.StatsCache,
registry *notifier.DDLNotifier,
) {
h := handler{statsCache: statsCache}
registry.RegisterHandler(notifier.StatsMetaHandlerID, h.handle)
) *subscriber {
h := subscriber{statsCache: statsCache}
return &h
}

func (h handler) handle(
func (h subscriber) handle(
ctx context.Context,
sctx sessionctx.Context,
change *notifier.SchemaChangeEvent,
Expand Down Expand Up @@ -243,7 +241,7 @@ func (h handler) handle(
return nil
}

func (h handler) insertStats4PhysicalID(
func (h subscriber) insertStats4PhysicalID(
ctx context.Context,
sctx sessionctx.Context,
info *model.TableInfo,
Expand All @@ -256,7 +254,7 @@ func (h handler) insertStats4PhysicalID(
return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS))
}

func (h handler) recordHistoricalStatsMeta(
func (h subscriber) recordHistoricalStatsMeta(
ctx context.Context,
sctx sessionctx.Context,
id int64,
Expand Down Expand Up @@ -287,7 +285,7 @@ func (h handler) recordHistoricalStatsMeta(
)
}

func (h handler) delayedDeleteStats4PhysicalID(
func (h subscriber) delayedDeleteStats4PhysicalID(
ctx context.Context,
sctx sessionctx.Context,
id int64,
Expand All @@ -299,7 +297,7 @@ func (h handler) delayedDeleteStats4PhysicalID(
return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS))
}

func (h handler) insertStats4Col(
func (h subscriber) insertStats4Col(
ctx context.Context,
sctx sessionctx.Context,
physicalID int64,
Expand Down
9 changes: 3 additions & 6 deletions pkg/statistics/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func NewHandle(
handle.StatsReadWriter,
handle,
)
if ddlNotifier != nil {
ddlNotifier.RegisterHandler(notifier.StatsMetaHandlerID, handle.DDL.HandleDDLEvent)
}
return handle, nil
}

Expand Down Expand Up @@ -195,12 +198,6 @@ func (h *Handle) getPartitionStats(tblInfo *model.TableInfo, pid int64, returnPs

// FlushStats flushes the cached stats update into store.
func (h *Handle) FlushStats() {
for len(h.DDLEventCh()) > 0 {
e := <-h.DDLEventCh()
if err := h.HandleDDLEvent(e); err != nil {
statslogutil.StatsLogger().Error("handle ddl event fail", zap.Error(err))
}
}
if err := h.DumpStatsDeltaToKV(true); err != nil {
statslogutil.StatsLogger().Error("dump stats delta fail", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ type StatsGlobal interface {
// DDL is used to handle ddl events.
type DDL interface {
// HandleDDLEvent handles ddl events.
HandleDDLEvent(changeEvent *notifier.SchemaChangeEvent) error
HandleDDLEvent(ctx context.Context, sctx sessionctx.Context, changeEvent *notifier.SchemaChangeEvent) error
// DDLEventCh returns ddl events channel in handle.
DDLEventCh() chan *notifier.SchemaChangeEvent
}
Expand Down

0 comments on commit 1c856d8

Please sign in to comment.