Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statistics: refactor common functions into subscriber #58127

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 0 additions & 51 deletions pkg/statistics/handle/ddl/drop_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,8 @@
package ddl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/statistics/handle/lockstats"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
)

Expand All @@ -46,47 +39,3 @@ func (h *ddlHandlerImpl) onDropPartitions(t *notifier.SchemaChangeEvent) error {

return nil
}

func updateGlobalTableStats4DropPartition(
ctx context.Context,
sctx sessionctx.Context,
globalTableInfo *model.TableInfo,
droppedPartitionInfo *model.PartitionInfo,
) error {
count := int64(0)
for _, def := range droppedPartitionInfo.Definitions {
// Get the count and modify count of the partition.
tableCount, _, _, err := storage.StatsMetaCountAndModifyCount(ctx, sctx, def.ID)
if err != nil {
return err
}
count += tableCount
}
if count == 0 {
return nil
}

lockedTables, err := lockstats.QueryLockedTables(ctx, sctx)
if err != nil {
return errors.Trace(err)
}
isLocked := false
if _, ok := lockedTables[globalTableInfo.ID]; ok {
isLocked = true
}
startTS, err := util.GetStartTS(sctx)
if err != nil {
return errors.Trace(err)
}

// Because we drop the partition, we should subtract the count from the global stats.
delta := -count
return errors.Trace(storage.UpdateStatsMeta(
ctx,
sctx,
startTS,
variable.TableDelta{Count: count, Delta: delta},
globalTableInfo.ID,
isLocked,
))
}
131 changes: 0 additions & 131 deletions pkg/statistics/handle/ddl/exchange_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,9 @@
package ddl

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/notifier"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/statistics/handle/logutil"
"github.com/pingcap/tidb/pkg/statistics/handle/storage"
"github.com/pingcap/tidb/pkg/statistics/handle/util"
"go.uber.org/zap"
)

func (h *ddlHandlerImpl) onExchangeAPartition(t *notifier.SchemaChangeEvent) error {
Expand All @@ -42,126 +34,3 @@ func (h *ddlHandlerImpl) onExchangeAPartition(t *notifier.SchemaChangeEvent) err
)
}, util.FlagWrapTxn)
}

func updateGlobalTableStats4ExchangePartition(
ctx context.Context,
sctx sessionctx.Context,
globalTableInfo *model.TableInfo,
originalPartInfo *model.PartitionInfo,
originalTableInfo *model.TableInfo,
) error {
partCount, partModifyCount, tableCount, tableModifyCount, err := getCountsAndModifyCounts(
ctx,
sctx,
originalPartInfo.Definitions[0].ID,
originalTableInfo.ID,
)
if err != nil {
return errors.Trace(err)
}

// The count of the partition should be added to the table.
// The formula is: total_count = original_table_count - original_partition_count + new_table_count.
// So the delta is : new_table_count - original_partition_count.
countDelta := tableCount - partCount
// Initially, the sum of tableCount and partCount represents
// the operation of deleting the partition and adding the table.
// Therefore, they are considered as modifyCountDelta.
// Next, since the old partition no longer belongs to the table,
// the modify count of the partition should be subtracted.
// The modify count of the table should be added as we are adding the table as a partition.
modifyCountDelta := (tableCount + partCount) - partModifyCount + tableModifyCount

if modifyCountDelta == 0 && countDelta == 0 {
return nil
}

// Update the global stats.
is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema)
globalTableSchema, ok := infoschema.SchemaByTable(is, globalTableInfo)
if !ok {
return errors.Errorf("schema not found for table %s", globalTableInfo.Name.O)
}
if err = updateStatsWithCountDeltaAndModifyCountDelta(
ctx,
sctx,
globalTableInfo.ID, countDelta, modifyCountDelta,
); err != nil {
fields := exchangePartitionLogFields(
globalTableSchema.Name.O,
globalTableInfo,
originalPartInfo.Definitions[0],
originalTableInfo,
countDelta, modifyCountDelta,
partCount,
partModifyCount,
tableCount,
tableModifyCount,
)
fields = append(fields, zap.Error(err))
logutil.StatsLogger().Error(
"Update global stats after exchange partition failed",
fields...,
)
return errors.Trace(err)
}
logutil.StatsLogger().Info(
"Update global stats after exchange partition",
exchangePartitionLogFields(
globalTableSchema.Name.O,
globalTableInfo,
originalPartInfo.Definitions[0],
originalTableInfo,
countDelta, modifyCountDelta,
partCount,
partModifyCount,
tableCount,
tableModifyCount,
)...,
)
return nil
}

func getCountsAndModifyCounts(
ctx context.Context,
sctx sessionctx.Context,
partitionID, tableID int64,
) (partCount, partModifyCount, tableCount, tableModifyCount int64, err error) {
partCount, partModifyCount, _, err = storage.StatsMetaCountAndModifyCount(ctx, sctx, partitionID)
if err != nil {
return
}

tableCount, tableModifyCount, _, err = storage.StatsMetaCountAndModifyCount(ctx, sctx, tableID)
if err != nil {
return
}

return
}

func exchangePartitionLogFields(
globalTableSchemaName string,
globalTableInfo *model.TableInfo,
originalPartDef model.PartitionDefinition,
originalTableInfo *model.TableInfo,
countDelta, modifyCountDelta,
partCount, partModifyCount,
tableCount, tableModifyCount int64,
) []zap.Field {
return []zap.Field{
zap.String("globalTableSchema", globalTableSchemaName),
zap.Int64("globalTableID", globalTableInfo.ID),
zap.String("globalTableName", globalTableInfo.Name.O),
zap.Int64("countDelta", countDelta),
zap.Int64("modifyCountDelta", modifyCountDelta),
zap.Int64("partitionID", originalPartDef.ID),
zap.String("partitionName", originalPartDef.Name.O),
zap.Int64("partitionCount", partCount),
zap.Int64("partitionModifyCount", partModifyCount),
zap.Int64("tableID", originalTableInfo.ID),
zap.String("tableName", originalTableInfo.Name.O),
zap.Int64("tableCount", tableCount),
zap.Int64("tableModifyCount", tableModifyCount),
}
}
Loading