Skip to content

Commit

Permalink
stats: implement stats handler for DDL notifier part 2 (pingcap#56519)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Oct 10, 2024
1 parent 2864508 commit 9b1fa1c
Showing 1 changed file with 85 additions and 7 deletions.
92 changes: 85 additions & 7 deletions pkg/statistics/handle/ddl/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,90 @@ func (h handler) handle(
}
}
case model.ActionAddColumn:
// TODO: implement me
newTableInfo, newColumnInfo := change.GetAddColumnInfo()
ids, err := getPhysicalIDs(sctx, newTableInfo)
if err != nil {
return errors.Trace(err)
}
for _, id := range ids {
if err = h.insertStats4Col(ctx, sctx, id, newColumnInfo); err != nil {
return errors.Trace(err)
}
}
case model.ActionModifyColumn:
// TODO: implement me
newTableInfo, modifiedColumnInfo := change.GetModifyColumnInfo()
ids, err := getPhysicalIDs(sctx, newTableInfo)
if err != nil {
return errors.Trace(err)
}
for _, id := range ids {
if err = h.insertStats4Col(ctx, sctx, id, modifiedColumnInfo); err != nil {
return errors.Trace(err)
}
}
case model.ActionAddTablePartition:
// TODO: implement me
globalTableInfo, addedPartitionInfo := change.GetAddPartitionInfo()
for _, def := range addedPartitionInfo.Definitions {
if err := h.insertStats4PhysicalID(ctx, sctx, globalTableInfo, def.ID); err != nil {
return errors.Trace(err)
}
}
case model.ActionTruncateTablePartition:
// TODO: implement me
case model.ActionDropTablePartition:
// TODO: implement me
case model.ActionExchangeTablePartition:
// TODO: implement me
case model.ActionReorganizePartition:
// TODO: implement me
globalTableInfo, addedPartInfo, droppedPartitionInfo := change.GetReorganizePartitionInfo()
// Avoid updating global stats as the data remains unchanged.
// For new partitions, it's crucial to correctly insert the count and modify count correctly.
// However, this is challenging due to the need to know the count of the new partitions.
// Given that a partition can be split into two, determining the count of the new partitions is so hard.
// It's acceptable to not update it immediately,
// as the new partitions will be analyzed shortly due to the absence of statistics for them.
// Therefore, the auto-analyze worker will handle them in the near future.
for _, def := range addedPartInfo.Definitions {
if err := h.insertStats4PhysicalID(ctx, sctx, globalTableInfo, def.ID); err != nil {
return err
}
}

// Reset the partition stats.
for _, def := range droppedPartitionInfo.Definitions {
if err := h.delayedDeleteStats4PhysicalID(ctx, sctx, def.ID); err != nil {
return err
}
}

return nil
case model.ActionAlterTablePartitioning:
// TODO: implement me
oldSingleTableID, globalTableInfo, addedPartInfo := change.GetAddPartitioningInfo()
// Add new partition stats.
for _, def := range addedPartInfo.Definitions {
if err := h.insertStats4PhysicalID(ctx, sctx, globalTableInfo, def.ID); err != nil {
return errors.Trace(err)
}
}
// Change id for global stats, since the data has not changed!
// Note: This operation will update all tables related to statistics with the new ID.
return errors.Trace(storage.ChangeGlobalStatsID(ctx, sctx, oldSingleTableID, globalTableInfo.ID))
case model.ActionRemovePartitioning:
// TODO: implement me
// Change id for global stats, since the data has not changed!
// Note: This operation will update all tables related to statistics with the new ID.
oldTblID, newSingleTableInfo, droppedPartInfo := change.GetRemovePartitioningInfo()
if err := storage.ChangeGlobalStatsID(ctx, sctx, oldTblID, newSingleTableInfo.ID); err != nil {
return errors.Trace(err)
}

// Remove partition stats.
for _, def := range droppedPartInfo.Definitions {
if err := h.delayedDeleteStats4PhysicalID(ctx, sctx, def.ID); err != nil {
return errors.Trace(err)
}
}
case model.ActionFlashbackCluster:
// TODO: implement me
return errors.Trace(storage.UpdateStatsVersion(ctx, sctx))
default:
intest.Assert(false)
logutil.StatsLogger().Error("Unhandled schema change event",
Expand Down Expand Up @@ -180,6 +245,19 @@ func (h handler) delayedDeleteStats4PhysicalID(
return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, id, startTS))
}

func (h handler) insertStats4Col(
ctx context.Context,
sctx sessionctx.Context,
physicalID int64,
colInfos []*model.ColumnInfo,
) error {
startTS, err := storage.InsertColStats2KV(ctx, sctx, physicalID, colInfos)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(h.recordHistoricalStatsMeta(ctx, sctx, physicalID, startTS))
}

func getPhysicalIDs(
sctx sessionctx.Context,
tblInfo *model.TableInfo,
Expand Down

0 comments on commit 9b1fa1c

Please sign in to comment.