Skip to content

Commit

Permalink
statistics: skip update global count and modify_count if only the par…
Browse files Browse the repository at this point in the history
…tition is locked (#47259)

ref #46351
  • Loading branch information
Rustin170506 authored Sep 26, 2023
1 parent 3f09e35 commit c25fd6f
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 6 deletions.
2 changes: 1 addition & 1 deletion statistics/handle/lockstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ go_test(
],
embed = [":lockstats"],
flaky = True,
shard_count = 9,
shard_count = 11,
deps = [
"//kv",
"//parser/mysql",
Expand Down
37 changes: 36 additions & 1 deletion statistics/handle/lockstats/unlock_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func RemoveLockedPartitions(
skippedPartitions = append(skippedPartitions, pidNames[pid])
continue
}
if err := updateStatsAndUnlockTable(ctx, exec, pid); err != nil {
if err := updateStatsAndUnlockPartition(ctx, exec, pid, tid); err != nil {
return "", err
}
}
Expand Down Expand Up @@ -194,6 +194,41 @@ func updateStatsAndUnlockTable(ctx context.Context, exec sqlexec.RestrictedSQLEx
return err
}

// updateStatsAndUnlockPartition also update the stats to the table level.
func updateStatsAndUnlockPartition(ctx context.Context, exec sqlexec.RestrictedSQLExecutor, partitionID int64, tid int64) error {
count, modifyCount, version, err := getStatsDeltaFromTableLocked(ctx, partitionID, exec)
if err != nil {
return err
}

if _, _, err := exec.ExecRestrictedSQL(
ctx,
useCurrentSession,
updateDeltaSQL,
version, count, modifyCount, partitionID,
); err != nil {
return err
}
if _, _, err := exec.ExecRestrictedSQL(
ctx,
useCurrentSession,
updateDeltaSQL,
version, count, modifyCount, tid,
); err != nil {
return err
}
cache.TableRowStatsCache.Invalidate(partitionID)
cache.TableRowStatsCache.Invalidate(tid)

_, _, err = exec.ExecRestrictedSQL(
ctx,
useCurrentSession,
DeleteLockSQL, partitionID,
)

return err
}

// getStatsDeltaFromTableLocked get count, modify_count and version for the given table from mysql.stats_table_locked.
func getStatsDeltaFromTableLocked(ctx context.Context, tableID int64, exec sqlexec.RestrictedSQLExecutor) (count, modifyCount int64, version uint64, err error) {
rows, _, err := exec.ExecRestrictedSQL(
Expand Down
112 changes: 112 additions & 0 deletions statistics/handle/lockstats/unlock_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,115 @@ func TestRemoveLockedTables(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "skip unlocking unlocked tables: test.t2, test.t3, other tables unlocked successfully", msg)
}

func TestRemoveLockedPartitions(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
exec := mock.NewMockRestrictedSQLExecutor(ctrl)

// Executed SQL should be:
exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
gomock.Eq("BEGIN PESSIMISTIC"),
)

// Return table 2 is locked.
c := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1)
c.AppendInt64(0, int64(2))
rows := []chunk.Row{c.GetRow(0)}
exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
selectSQL,
).Return(rows, nil, nil)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
selectDeltaSQL,
gomock.Eq([]interface{}{int64(2)}),
).Return([]chunk.Row{createStatsDeltaRow(1, 1, 1000)}, nil, nil)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
updateDeltaSQL,
gomock.Eq([]interface{}{uint64(1000), int64(1), int64(1), int64(2)}),
).Return(nil, nil, nil)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
updateDeltaSQL,
gomock.Eq([]interface{}{uint64(1000), int64(1), int64(1), int64(1)}),
).Return(nil, nil, nil)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
DeleteLockSQL,
gomock.Eq([]interface{}{int64(2)}),
).Return(nil, nil, nil)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
"COMMIT",
)

pidAndNames := map[int64]string{
2: "p1",
}

msg, err := RemoveLockedPartitions(
exec,
1,
"test.t1",
pidAndNames,
)
require.NoError(t, err)
require.Equal(t, "", msg)
}

func TestRemoveLockedPartitionsFailedIfTheWholeTableIsLocked(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
exec := mock.NewMockRestrictedSQLExecutor(ctrl)

// Executed SQL should be:
exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
gomock.Eq("BEGIN PESSIMISTIC"),
)

// Return table 2 is locked.
c := chunk.NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1)
c.AppendInt64(0, int64(1))
rows := []chunk.Row{c.GetRow(0)}
exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
selectSQL,
).Return(rows, nil, nil)

exec.EXPECT().ExecRestrictedSQL(
gomock.All(&ctxMatcher{}),
useCurrentSession,
"COMMIT",
)

pidAndNames := map[int64]string{
2: "p1",
}

msg, err := RemoveLockedPartitions(
exec,
1,
"test.t1",
pidAndNames,
)
require.NoError(t, err)
require.Equal(t, "skip unlocking partitions of locked table: test.t1", msg)
}
12 changes: 8 additions & 4 deletions statistics/handle/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,11 +334,15 @@ func (h *Handle) dumpTableStatCountToKV(is infoschema.InfoSchema, physicalTableI
return
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
// If it's a partitioned table and its global-stats exists, update its count and modify_count as well.
if err = updateStatsMeta(ctx, exec, statsVersion, delta, tableID, isTableLocked); err != nil {
return
// If only the partition is locked, we don't need to update the global-stats.
// We will update its global-stats when the partition is unlocked.
if isTableLocked || !isPartitionLocked {
// If it's a partitioned table and its global-stats exists, update its count and modify_count as well.
if err = updateStatsMeta(ctx, exec, statsVersion, delta, tableID, isTableLocked); err != nil {
return
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
}
affectedRows += sctx.GetSessionVars().StmtCtx.AffectedRows()
} else {
// This is a non-partitioned table.
// Check if it's locked.
Expand Down

0 comments on commit c25fd6f

Please sign in to comment.