Skip to content

Commit

Permalink
statistics: refactor common functions into subscriber (pingcap#58127)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Dec 20, 2024
1 parent 13a039b commit 0ed71f9
Show file tree
Hide file tree
Showing 4 changed files with 284 additions and 307 deletions.
51 changes: 0 additions & 51 deletions pkg/statistics/handle/ddl/drop_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@
package ddl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
)

Expand All @@ -46,47 +39,3 @@ func (h *ddlHandlerImpl) onDropPartitions(t *notifier.SchemaChangeEvent) error {

return nil
}

func updateGlobalTableStats4DropPartition(
ctx context.Context,
sctx sessionctx.Context,
globalTableInfo *model.TableInfo,
droppedPartitionInfo *model.PartitionInfo,
) error {
count := int64(0)
for _, def := range droppedPartitionInfo.Definitions {
// Get the count and modify count of the partition.
tableCount, _, _, err := storage.StatsMetaCountAndModifyCount(ctx, sctx, def.ID)
if err != nil {
return err
}
count += tableCount
}
if count == 0 {
return nil
}

lockedTables, err := lockstats.QueryLockedTables(ctx, sctx)
if err != nil {
return errors.Trace(err)
}
isLocked := false
if _, ok := lockedTables[globalTableInfo.ID]; ok {
isLocked = true
}
startTS, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}

// Because we drop the partition, we should subtract the count from the global stats.
delta := -count
return errors.Trace(storage.UpdateStatsMeta(
ctx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
))
}
131 changes: 0 additions & 131 deletions pkg/statistics/handle/ddl/exchange_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,9 @@
package ddl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"go.uber.org/zap"
)

func (h *ddlHandlerImpl) onExchangeAPartition(t *notifier.SchemaChangeEvent) error {
Expand All @@ -42,126 +34,3 @@ func (h *ddlHandlerImpl) onExchangeAPartition(t *notifier.SchemaChangeEvent) err
)
}, util.FlagWrapTxn)
}

func updateGlobalTableStats4ExchangePartition(
ctx context.Context,
sctx sessionctx.Context,
globalTableInfo *model.TableInfo,
originalPartInfo *model.PartitionInfo,
originalTableInfo *model.TableInfo,
) error {
partCount, partModifyCount, tableCount, tableModifyCount, err := getCountsAndModifyCounts(
ctx,
sctx,
originalPartInfo.Definitions[0].ID,
originalTableInfo.ID,
)
if err != nil {
return errors.Trace(err)
}

// The count of the partition should be added to the table.
// The formula is: total_count = original_table_count - original_partition_count + new_table_count.
// So the delta is : new_table_count - original_partition_count.
countDelta := tableCount - partCount
// Initially, the sum of tableCount and partCount represents
// the operation of deleting the partition and adding the table.
// Therefore, they are considered as modifyCountDelta.
// Next, since the old partition no longer belongs to the table,
// the modify count of the partition should be subtracted.
// The modify count of the table should be added as we are adding the table as a partition.
modifyCountDelta := (tableCount + partCount) - partModifyCount + tableModifyCount

if modifyCountDelta == 0 && countDelta == 0 {
return nil
}

// Update the global stats.
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
globalTableSchema, ok := infoschema.SchemaByTable(is, globalTableInfo)
if !ok {
return errors.Errorf("schema not found for table %s", globalTableInfo.Name.O)
}
if err = updateStatsWithCountDeltaAndModifyCountDelta(
ctx,
sctx,
globalTableInfo.ID, countDelta, modifyCountDelta,
); err != nil {
fields := exchangePartitionLogFields(
globalTableSchema.Name.O,
globalTableInfo,
originalPartInfo.Definitions[0],
originalTableInfo,
countDelta, modifyCountDelta,
partCount,
partModifyCount,
tableCount,
tableModifyCount,
)
fields = append(fields, zap.Error(err))
logutil.StatsLogger().Error(
"Update global stats after exchange partition failed",
fields...,
)
return errors.Trace(err)
}
logutil.StatsLogger().Info(
"Update global stats after exchange partition",
exchangePartitionLogFields(
globalTableSchema.Name.O,
globalTableInfo,
originalPartInfo.Definitions[0],
originalTableInfo,
countDelta, modifyCountDelta,
partCount,
partModifyCount,
tableCount,
tableModifyCount,
)...,
)
return nil
}

func getCountsAndModifyCounts(
ctx context.Context,
sctx sessionctx.Context,
partitionID, tableID int64,
) (partCount, partModifyCount, tableCount, tableModifyCount int64, err error) {
partCount, partModifyCount, _, err = storage.StatsMetaCountAndModifyCount(ctx, sctx, partitionID)
if err != nil {
return
}

tableCount, tableModifyCount, _, err = storage.StatsMetaCountAndModifyCount(ctx, sctx, tableID)
if err != nil {
return
}

return
}

func exchangePartitionLogFields(
globalTableSchemaName string,
globalTableInfo *model.TableInfo,
originalPartDef model.PartitionDefinition,
originalTableInfo *model.TableInfo,
countDelta, modifyCountDelta,
partCount, partModifyCount,
tableCount, tableModifyCount int64,
) []zap.Field {
return []zap.Field{
zap.String("globalTableSchema", globalTableSchemaName),
zap.Int64("globalTableID", globalTableInfo.ID),
zap.String("globalTableName", globalTableInfo.Name.O),
zap.Int64("countDelta", countDelta),
zap.Int64("modifyCountDelta", modifyCountDelta),
zap.Int64("partitionID", originalPartDef.ID),
zap.String("partitionName", originalPartDef.Name.O),
zap.Int64("partitionCount", partCount),
zap.Int64("partitionModifyCount", partModifyCount),
zap.Int64("tableID", originalTableInfo.ID),
zap.String("tableName", originalTableInfo.Name.O),
zap.Int64("tableCount", tableCount),
zap.Int64("tableModifyCount", tableModifyCount),
}
}
Loading

0 comments on commit 0ed71f9

Please sign in to comment.