-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
statistics: support historical stats dump partition table #40310
Changes from 2 commits
7bcf4bb
9d79894
2eeae63
7c7a5ac
0b332ce
9c891a5
921ec81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -291,6 +291,8 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n | |
} | ||
} | ||
|
||
tableIDs := map[int64]struct{}{} | ||
|
||
// save analyze results in single-thread. | ||
statsHandle := domain.GetDomain(e.ctx).StatsHandle() | ||
panicCnt := 0 | ||
|
@@ -311,6 +313,7 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n | |
continue | ||
} | ||
handleGlobalStats(needGlobalStats, globalStatsMap, results) | ||
tableIDs[results.TableID.GetStatisticsID()] = struct{}{} | ||
|
||
if err1 := statsHandle.SaveTableStatsToStorage(results, e.ctx.GetSessionVars().EnableAnalyzeSnapshot, handle.StatsMetaHistorySourceAnalyze); err1 != nil { | ||
tableID := results.TableID.TableID | ||
|
@@ -319,17 +322,20 @@ func (e *AnalyzeExec) handleResultsError(ctx context.Context, concurrency int, n | |
finishJobWithLog(e.ctx, results.Job, err) | ||
} else { | ||
finishJobWithLog(e.ctx, results.Job, nil) | ||
// Dump stats to historical storage. | ||
if err := recordHistoricalStats(e.ctx, results.TableID.TableID); err != nil { | ||
logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) | ||
} | ||
} | ||
invalidInfoSchemaStatCache(results.TableID.GetStatisticsID()) | ||
if atomic.LoadUint32(&e.ctx.GetSessionVars().Killed) == 1 { | ||
finishJobWithLog(e.ctx, results.Job, ErrQueryInterrupted) | ||
return errors.Trace(ErrQueryInterrupted) | ||
} | ||
} | ||
// Dump stats to historical storage. | ||
for tableID := range tableIDs { | ||
if err := recordHistoricalStats(e.ctx, tableID); err != nil { | ||
logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) | ||
} | ||
} | ||
|
||
return err | ||
} | ||
|
||
|
@@ -348,6 +354,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta | |
worker.run(ctx1, e.ctx.GetSessionVars().EnableAnalyzeSnapshot) | ||
}) | ||
} | ||
tableIDs := map[int64]struct{}{} | ||
panicCnt := 0 | ||
var err error | ||
for panicCnt < statsConcurrency { | ||
|
@@ -370,6 +377,7 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta | |
continue | ||
} | ||
handleGlobalStats(needGlobalStats, globalStatsMap, results) | ||
tableIDs[results.TableID.GetStatisticsID()] = struct{}{} | ||
saveResultsCh <- results | ||
} | ||
close(saveResultsCh) | ||
|
@@ -382,6 +390,12 @@ func (e *AnalyzeExec) handleResultsErrorWithConcurrency(ctx context.Context, sta | |
} | ||
err = errors.New(strings.Join(errMsg, ",")) | ||
} | ||
for tableID := range tableIDs { | ||
// Dump stats to historical storage. | ||
if err := recordHistoricalStats(e.ctx, tableID); err != nil { | ||
logutil.BgLogger().Error("record historical stats failed", zap.Error(err)) | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though we save stats concurrently by several workers, the logic of recording historical stats is sequential. Will we improve it in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dumping historical stats is worked by multiple worker in background, thus it's not sequential |
||
return err | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2565,17 +2565,27 @@ func (h *Handle) GetPredicateColumns(tableID int64) ([]int64, error) { | |
const maxColumnSize = 6 << 20 | ||
|
||
// RecordHistoricalStatsToStorage records the given table's stats data to mysql.stats_history | ||
func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo) (uint64, error) { | ||
func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model.TableInfo, physicalID int64, isPartition bool) (uint64, error) { | ||
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats) | ||
js, err := h.DumpStatsToJSON(dbName, tableInfo, nil, true) | ||
var js *JSONTable | ||
var err error | ||
if isPartition { | ||
js, err = h.tableStatsToJSON(dbName, tableInfo, physicalID, 0) | ||
} else { | ||
js, err = h.DumpStatsToJSON(dbName, tableInfo, nil, true) | ||
Comment on lines
+2573
to
+2575
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
if err != nil { | ||
return 0, errors.Trace(err) | ||
} | ||
version := uint64(0) | ||
for _, value := range js.Columns { | ||
version = uint64(*value.StatsVer) | ||
if version != 0 { | ||
break | ||
if len(js.Partitions) == 0 { | ||
version = js.Version | ||
} else { | ||
for _, p := range js.Partitions { | ||
version = p.Version | ||
if version != 0 { | ||
break | ||
} | ||
} | ||
} | ||
blocks, err := JSONTableToBlocks(js, maxColumnSize) | ||
|
@@ -2596,7 +2606,7 @@ func (h *Handle) RecordHistoricalStatsToStorage(dbName string, tableInfo *model. | |
|
||
const sql = "INSERT INTO mysql.stats_history(table_id, stats_data, seq_no, version, create_time) VALUES (%?, %?, %?, %?, %?)" | ||
for i := 0; i < len(blocks); i++ { | ||
if _, err := exec.ExecuteInternal(ctx, sql, tableInfo.ID, blocks[i], i, version, ts); err != nil { | ||
if _, err := exec.ExecuteInternal(ctx, sql, physicalID, blocks[i], i, version, ts); err != nil { | ||
return version, errors.Trace(err) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need
generateHistoricalStatsFailedCounter
here anymore?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.