Skip to content

Commit

Permalink
statistics: process DML changes in the new PQ (#56383)
Browse files Browse the repository at this point in the history
ref #55906
  • Loading branch information
Rustin170506 authored Oct 8, 2024
1 parent 7daf026 commit 1455d45
Show file tree
Hide file tree
Showing 14 changed files with 728 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ go_test(
"static_partitioned_table_analysis_job_test.go",
],
flaky = True,
shard_count = 32,
shard_count = 34,
deps = [
":priorityqueue",
"//pkg/meta/model",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,16 @@ func (j *TestJob) Analyze(statsHandle types.StatsHandle, sysProcTracker sysproct
panic("unimplemented")
}

// RegisterSuccessHook implements AnalysisJob.
func (j *TestJob) RegisterSuccessHook(hook priorityqueue.JobHook) {
panic("unimplemented")
}

// RegisterFailureHook implements AnalysisJob.
func (j *TestJob) RegisterFailureHook(hook priorityqueue.JobHook) {
panic("unimplemented")
}

// GetWeight implements AnalysisJob.
func (j *TestJob) GetWeight() float64 {
panic("unimplemented")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type DynamicPartitionedTableAnalysisJob struct {
// For example, the user may analyze some partitions manually, and we don't want to analyze them again.
PartitionIndexes map[string][]string

successHook JobHook
failureHook JobHook

TableSchema string
GlobalTableName string
// This will analyze all indexes and columns of the specified partitions.
Expand Down Expand Up @@ -94,6 +97,12 @@ func (j *DynamicPartitionedTableAnalysisJob) Analyze(
statsHandle statstypes.StatsHandle,
sysProcTracker sysproctrack.Tracker,
) error {
defer func() {
if j.successHook != nil {
j.successHook(j)
}
}()

return statsutil.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error {
switch j.getAnalyzeType() {
case analyzeDynamicPartition:
Expand All @@ -105,6 +114,16 @@ func (j *DynamicPartitionedTableAnalysisJob) Analyze(
})
}

// RegisterSuccessHook registers a successHook function that will be called after the job can be marked as successful.
func (j *DynamicPartitionedTableAnalysisJob) RegisterSuccessHook(hook JobHook) {
j.successHook = hook
}

// RegisterFailureHook registers a successHook function that will be called after the job can be marked as failed.
func (j *DynamicPartitionedTableAnalysisJob) RegisterFailureHook(hook JobHook) {
j.failureHook = hook
}

// GetIndicators returns the indicators of the table.
func (j *DynamicPartitionedTableAnalysisJob) GetIndicators() Indicators {
return j.Indicators
Expand Down Expand Up @@ -136,6 +155,9 @@ func (j *DynamicPartitionedTableAnalysisJob) IsValidToAnalyze(
j.GlobalTableName,
partitions...,
); !valid {
if j.failureHook != nil {
j.failureHook(j)
}
return false, failReason
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/statistics/handle/autoanalyze/priorityqueue/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ type Indicators struct {
LastAnalysisDuration time.Duration
}

// JobHook is the successHook function that will be called after the job is completed.
type JobHook func(job AnalysisJob)

// AnalysisJob is the interface for the analysis job.
type AnalysisJob interface {
// IsValidToAnalyze checks whether the table is valid to analyze.
Expand Down Expand Up @@ -77,6 +80,12 @@ type AnalysisJob interface {
// GetTableID gets the table ID of the job.
GetTableID() int64

// RegisterSuccessHook registers a successHook function that will be called after the job can be marked as successful.
RegisterSuccessHook(hook JobHook)

// RegisterFailureHook registers a successHook function that will be called after the job is marked as failed.
RegisterFailureHook(hook JobHook)

fmt.Stringer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (

// NonPartitionedTableAnalysisJob is a TableAnalysisJob for analyzing the physical table.
type NonPartitionedTableAnalysisJob struct {
successHook JobHook
failureHook JobHook
TableSchema string
TableName string
// This is only for newly added indexes.
Expand Down Expand Up @@ -79,6 +81,12 @@ func (j *NonPartitionedTableAnalysisJob) Analyze(
statsHandle statstypes.StatsHandle,
sysProcTracker sysproctrack.Tracker,
) error {
defer func() {
if j.successHook != nil {
j.successHook(j)
}
}()

return statsutil.CallWithSCtx(statsHandle.SPool(), func(sctx sessionctx.Context) error {
switch j.getAnalyzeType() {
case analyzeTable:
Expand All @@ -90,6 +98,16 @@ func (j *NonPartitionedTableAnalysisJob) Analyze(
})
}

// RegisterSuccessHook registers a successHook function that will be called after the job can be marked as successful.
func (j *NonPartitionedTableAnalysisJob) RegisterSuccessHook(hook JobHook) {
j.successHook = hook
}

// RegisterFailureHook registers a failureHook function that will be called after the job can be marked as failed.
func (j *NonPartitionedTableAnalysisJob) RegisterFailureHook(hook JobHook) {
j.failureHook = hook
}

// HasNewlyAddedIndex checks whether the table has newly added indexes.
func (j *NonPartitionedTableAnalysisJob) HasNewlyAddedIndex() bool {
return len(j.Indexes) > 0
Expand All @@ -105,6 +123,10 @@ func (j *NonPartitionedTableAnalysisJob) IsValidToAnalyze(
j.TableSchema,
j.TableName,
); !valid {
if j.failureHook != nil {
j.failureHook(j)
}

return false, failReason
}

Expand Down
8 changes: 0 additions & 8 deletions pkg/statistics/handle/autoanalyze/priorityqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package priorityqueue
import (
"container/heap"
"context"
"time"

"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/model"
Expand All @@ -38,7 +37,6 @@ type PushJobFunc func(job AnalysisJob) error
func FetchAllTablesAndBuildAnalysisJobs(
sctx sessionctx.Context,
parameters map[string]string,
autoAnalysisTimeWindow AutoAnalysisTimeWindow,
statsHandle statstypes.StatsHandle,
jobFunc PushJobFunc,
) error {
Expand All @@ -62,12 +60,6 @@ func FetchAllTablesAndBuildAnalysisJobs(

dbs := is.AllSchemaNames()
for _, db := range dbs {
// Sometimes the tables are too many. Auto-analyze will take too much time on it.
// so we need to check the available time.
if !autoAnalysisTimeWindow.IsWithinTimeWindow(time.Now()) {
return nil
}

// Ignore the memory and system database.
if util.IsMemOrSysDB(db.L) {
continue
Expand Down
Loading

0 comments on commit 1455d45

Please sign in to comment.