From da7c47a5bed4b770508be3b9816805f76a6fc169 Mon Sep 17 00:00:00 2001 From: Jibing Li Date: Fri, 1 Mar 2024 17:34:51 +0800 Subject: [PATCH] Support column level health value. --- .../doris/analysis/ShowColumnStatsStmt.java | 4 + .../doris/datasource/InternalCatalog.java | 6 +- .../org/apache/doris/qe/SessionVariable.java | 7 ++ .../apache/doris/statistics/AnalysisInfo.java | 17 +++- .../doris/statistics/AnalysisInfoBuilder.java | 17 +++- .../apache/doris/statistics/AnalysisJob.java | 8 +- .../doris/statistics/AnalysisManager.java | 13 ++- .../apache/doris/statistics/ColStatsMeta.java | 16 ++-- .../doris/statistics/OlapAnalysisTask.java | 2 +- .../statistics/StatisticsAutoCollector.java | 95 +++++++++++++++++-- .../statistics/StatisticsJobAppender.java | 28 ++++-- .../doris/statistics/TableStatsMeta.java | 33 +++---- .../doris/statistics/util/StatisticsUtil.java | 10 ++ .../doris/statistics/AnalysisJobTest.java | 4 +- .../doris/statistics/AnalysisManagerTest.java | 2 +- 15 files changed, 198 insertions(+), 64 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java index 37be76b20df09d..749bfa7d360e44 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowColumnStatsStmt.java @@ -61,6 +61,8 @@ public class ShowColumnStatsStmt extends ShowStmt { .add("trigger") .add("query_times") .add("updated_time") + .add("update_rows") + .add("last_analyze_row_count") .build(); private final TableName tableName; @@ -160,6 +162,8 @@ public ShowResultSet constructResultSet(List, ColumnSt row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.jobType)); row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.queriedTimes)); row.add(String.valueOf(p.second.updatedTime)); + row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.updatedRows)); + row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.rowCount)); result.add(row); }); return new ShowResultSet(getMetaData(), result); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 95abc9e06c478b..61a1073da3466a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3116,7 +3116,6 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti rowsToTruncate += partition.getBaseIndex().getRowCount(); } } else { - rowsToTruncate = olapTable.getRowCount(); for (Partition partition : olapTable.getPartitions()) { // If need absolutely correct, should check running txn here. // But if the txn is in prepare state, cann't known which partitions had load data. @@ -3125,6 +3124,7 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti } origPartitions.put(partition.getName(), partition.getId()); partitionsDistributionInfo.put(partition.getId(), partition.getDistributionInfo()); + rowsToTruncate += partition.getBaseIndex().getRowCount(); } } // if table currently has no partitions, this sql like empty command and do nothing, should return directly. @@ -3285,10 +3285,8 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti if (truncateEntireTable) { // Drop the whole table stats after truncate the entire table Env.getCurrentEnv().getAnalysisManager().dropStats(olapTable); - } else { - // Update the updated rows in table stats after truncate some partitions. - Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords); } + Env.getCurrentEnv().getAnalysisManager().updateUpdatedRows(updateRecords); LOG.info("finished to truncate table {}, partitions: {}", tblRef.getName().toSql(), tblRef.getPartitionNames()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 5f7e8ebc32ed3e..a8830f6fc313bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -468,6 +468,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_AUTO_ANALYZE = "enable_auto_analyze"; + public static final String ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG = "enable_auto_analyze_internal_catalog"; + public static final String AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD = "auto_analyze_table_width_threshold"; public static final String FASTER_FLOAT_CONVERT = "faster_float_convert"; @@ -1492,6 +1494,11 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) { flag = VariableMgr.GLOBAL) public boolean enableAutoAnalyze = true; + @VariableMgr.VarAttr(name = ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG, + description = {"临时参数,收否自动收集所有内表", "Temp variable, enable to auto collect all OlapTable."}, + flag = VariableMgr.GLOBAL) + public boolean enableAutoAnalyzeInternalCatalog = false; + @VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD, description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集", "Maximum table width to enable auto analyze, " diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index c707107e0e0fb6..0c5047a53c5d06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -188,8 +188,11 @@ public enum ScheduleType { @SerializedName("endTime") public long endTime; - @SerializedName("emptyJob") - public final boolean emptyJob; + @SerializedName("rowCount") + public final long rowCount; + + @SerializedName("updateRows") + public final long updateRows; /** * * Used to store the newest partition version of tbl when creating this job. @@ -206,7 +209,8 @@ public AnalysisInfo(long jobId, long taskId, List taskIds, long catalogId, long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition, boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull, - boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject) { + boolean usingSqlForPartitionColumn, long tblUpdateTime, long rowCount, boolean userInject, + long updateRows) { this.jobId = jobId; this.taskId = taskId; this.taskIds = taskIds; @@ -242,8 +246,9 @@ public AnalysisInfo(long jobId, long taskId, List taskIds, long catalogId, this.forceFull = forceFull; this.usingSqlForPartitionColumn = usingSqlForPartitionColumn; this.tblUpdateTime = tblUpdateTime; - this.emptyJob = emptyJob; + this.rowCount = rowCount; this.userInject = userInject; + this.updateRows = updateRows; } @Override @@ -285,7 +290,9 @@ public String toString() { } sj.add("forceFull: " + forceFull); sj.add("usingSqlForPartitionColumn: " + usingSqlForPartitionColumn); - sj.add("emptyJob: " + emptyJob); + sj.add("rowCount: " + rowCount); + sj.add("userInject: " + userInject); + sj.add("updateRows: " + updateRows); return sj.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java index 22f3d22b3ce77c..527d503fd522a5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java @@ -62,8 +62,9 @@ public class AnalysisInfoBuilder { private boolean forceFull; private boolean usingSqlForPartitionColumn; private long tblUpdateTime; - private boolean emptyJob; + private long rowCount; private boolean userInject; + private long updateRows; public AnalysisInfoBuilder() { } @@ -101,8 +102,9 @@ public AnalysisInfoBuilder(AnalysisInfo info) { forceFull = info.forceFull; usingSqlForPartitionColumn = info.usingSqlForPartitionColumn; tblUpdateTime = info.tblUpdateTime; - emptyJob = info.emptyJob; + rowCount = info.rowCount; userInject = info.userInject; + updateRows = info.updateRows; } public AnalysisInfoBuilder setJobId(long jobId) { @@ -265,8 +267,8 @@ public AnalysisInfoBuilder setTblUpdateTime(long tblUpdateTime) { return this; } - public AnalysisInfoBuilder setEmptyJob(boolean emptyJob) { - this.emptyJob = emptyJob; + public AnalysisInfoBuilder setRowCount(long rowCount) { + this.rowCount = rowCount; return this; } @@ -275,12 +277,17 @@ public AnalysisInfoBuilder setUserInject(boolean userInject) { return this; } + public AnalysisInfoBuilder setUpdateRows(long updateRows) { + this.updateRows = updateRows; + return this; + } + public AnalysisInfo build() { return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType, externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount, - cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject); + cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, rowCount, userInject, updateRows); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index 22eab37f920bdd..6846abea431476 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -84,14 +84,12 @@ public synchronized void rowCountDone(BaseAnalysisTask task) { protected void markOneTaskDone() { if (queryingTask.isEmpty()) { try { - writeBuf(); - updateTaskState(AnalysisState.FINISHED, "Cost time in sec: " - + (System.currentTimeMillis() - start) / 1000); + flushBuffer(); } finally { deregisterJob(); } } else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) { - writeBuf(); + flushBuffer(); } } @@ -115,7 +113,7 @@ public void updateTaskState(AnalysisState state, String msg) { } } - protected void writeBuf() { + protected void flushBuffer() { if (killed) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index e7ab342d09157d..0ff9e3a9e00fd1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -413,7 +413,10 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio infoBuilder.setColToPartitions(colToPartitions); infoBuilder.setTaskIds(Lists.newArrayList()); infoBuilder.setTblUpdateTime(table.getUpdateTime()); - infoBuilder.setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0); + long rowCount = table.getRowCount(); + infoBuilder.setRowCount(rowCount); + TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId()); + infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get()); return infoBuilder.build(); } @@ -569,7 +572,7 @@ public void updateTableStats(AnalysisInfo jobInfo) { } TableStatsMeta tableStats = findTableStatsStatus(tbl.getId()); if (tableStats == null) { - updateTableStatsStatus(new TableStatsMeta(jobInfo.emptyJob ? 0 : tbl.getRowCount(), jobInfo, tbl)); + updateTableStatsStatus(new TableStatsMeta(jobInfo.rowCount, jobInfo, tbl)); } else { tableStats.update(jobInfo, tbl); logCreateTableStats(tableStats); @@ -802,7 +805,7 @@ private BaseAnalysisTask createTask(AnalysisInfo analysisInfo) throws DdlExcepti analysisInfo.dbId, analysisInfo.tblId); return table.createAnalysisTask(analysisInfo); } catch (Throwable t) { - LOG.warn("Failed to find table", t); + LOG.warn("Failed to create task.", t); throw new DdlException("Failed to create task", t); } } @@ -1141,10 +1144,12 @@ public boolean canSample(TableIf table) { public void updateColumnUsedInPredicate(Set slotReferences) { + LOG.info("Add slots to high priority queues."); updateColumn(slotReferences, highPriorityColumns); } public void updateQueriedColumn(Collection slotReferences) { + LOG.info("Add slots to mid priority queues."); updateColumn(slotReferences, midPriorityColumns); } @@ -1164,6 +1169,8 @@ protected void updateColumn(Collection slotReferences, Queue partitionNames = info.colToPartitions.get(info.colName); - if ((info.emptyJob && info.analysisMethod.equals(AnalysisInfo.AnalysisMethod.SAMPLE)) + if ((info.rowCount == 0 && info.analysisMethod.equals(AnalysisInfo.AnalysisMethod.SAMPLE)) || partitionNames == null || partitionNames.isEmpty()) { if (partitionNames == null) { LOG.warn("Table {}.{}.{}, partitionNames for column {} is null. ColToPartitions:[{}]", diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java index 19b9c69db085bf..c498881bfbf742 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsAutoCollector.java @@ -23,6 +23,8 @@ import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.datasource.hive.HMSExternalTable.DLAType; import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod; import org.apache.doris.statistics.AnalysisInfo.JobType; import org.apache.doris.statistics.AnalysisInfo.ScheduleType; @@ -33,6 +35,8 @@ import java.time.LocalTime; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -60,9 +64,16 @@ protected void collect() { } try { TableIf table = job.getKey(); + if (!supportAutoAnalyze(table)) { + continue; + } Set columns = job.getValue() .stream() - .filter(c -> needAnalyzeColumn(table, c)) + .filter(c -> { + boolean needAnalyzeColumn = needAnalyzeColumn(table, c); + LOG.info("Need analyze column " + c + " ? " + needAnalyzeColumn); + return needAnalyzeColumn; + }) .collect(Collectors.toSet()); processOneJob(table, columns); } catch (Exception e) { @@ -100,22 +111,92 @@ protected Optional>> fetchJobFromMap(Map columns) throws DdlException { - Set collect = columns.stream().filter(c -> needAnalyzeColumn(table, c)).collect(Collectors.toSet()); - if (collect.isEmpty()) { + appendPartitionColumns(table, columns); + if (columns.isEmpty()) { return; } AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns); + LOG.info("Analyze job : {}", analyzeJob.toString()); createSystemAnalysisJob(analyzeJob); } + protected void appendPartitionColumns(TableIf table, Set columns) { + if (!(table instanceof OlapTable)) { + return; + } + AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); + TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId()); + if (tableStatsStatus != null && tableStatsStatus.newPartitionLoaded.get()) { + OlapTable olapTable = (OlapTable) table; + columns.addAll(olapTable.getPartitionNames()); + } + } + protected boolean needAnalyzeColumn(TableIf table, String column) { - //TODO: Calculate column health value. - return true; + AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); + TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId()); + if (tableStatsStatus == null) { + return true; + } + if (tableStatsStatus.userInjected) { + return false; + } + ColStatsMeta columnStatsMeta = tableStatsStatus.findColumnStatsMeta(column); + if (columnStatsMeta == null) { + return true; + } + if (table instanceof OlapTable) { + long currentUpdatedRows = tableStatsStatus.updatedRows.get(); + long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows; + if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) { + return true; + } + OlapTable olapTable = (OlapTable) table; + if (tableStatsStatus.newPartitionLoaded.get() && olapTable.isPartitionColumn(column)) { + return true; + } + if (columnStatsMeta.rowCount == 0 && olapTable.getRowCount() > 0) { + return true; + } + if (currentUpdatedRows == lastAnalyzeUpdateRows) { + return false; + } + double healthValue = ((double) (currentUpdatedRows - lastAnalyzeUpdateRows) + / (double) currentUpdatedRows) * 100.0; + LOG.info("Column " + column + " health value is " + healthValue); + return healthValue < StatisticsUtil.getTableStatsHealthThreshold(); + } else { + if (!(table instanceof HMSExternalTable)) { + return false; + } + HMSExternalTable hmsTable = (HMSExternalTable) table; + if (!hmsTable.getDlaType().equals(DLAType.HIVE)) { + return false; + } + return System.currentTimeMillis() + - tableStatsStatus.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis(); + } + } + + protected boolean supportAutoAnalyze(TableIf tableIf) { + if (tableIf == null) { + return false; + } + return tableIf instanceof OlapTable + || tableIf instanceof HMSExternalTable + && ((HMSExternalTable) tableIf).getDlaType().equals(HMSExternalTable.DLAType.HIVE); } protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set columns) { AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes() ? AnalysisMethod.SAMPLE : AnalysisMethod.FULL; + AnalysisManager manager = Env.getServingEnv().getAnalysisManager(); + TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId()); + long rowCount = table.getRowCount(); + Map> colToPartitions = new HashMap<>(); + Set dummyPartition = new HashSet<>(); + dummyPartition.add("dummy partition"); + columns.stream().forEach(c -> colToPartitions.put(c, dummyPartition)); return new AnalysisInfoBuilder() .setJobId(Env.getCurrentEnv().getNextId()) .setCatalogId(table.getDatabase().getCatalog().getId()) @@ -133,7 +214,9 @@ protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set columns .setLastExecTimeInMs(System.currentTimeMillis()) .setJobType(JobType.SYSTEM) .setTblUpdateTime(table.getUpdateTime()) - .setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0) + .setRowCount(rowCount) + .setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get()) + .setColToPartitions(colToPartitions) .build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java index 73d0d1340ad2bb..71bb71d3cda350 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobAppender.java @@ -19,6 +19,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.util.MasterDaemon; @@ -28,7 +29,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -70,34 +71,44 @@ protected void runAfterCatalogReady() { protected void appendJobs() { AnalysisManager manager = Env.getCurrentEnv().getAnalysisManager(); + // LOG.info("Append column to high priority job map."); appendColumnsToJobs(manager.highPriorityColumns, manager.highPriorityJobs); + // LOG.info("Append column to mid priority job map."); appendColumnsToJobs(manager.midPriorityColumns, manager.midPriorityJobs); - appendToLowQueue(manager.lowPriorityJobs); + if (StatisticsUtil.enableAutoAnalyzeInternalCatalog()) { + // LOG.info("Append column to low priority job map."); + appendToLowQueue(manager.lowPriorityJobs); + } } protected void appendColumnsToJobs(Queue columnQueue, Map> jobsMap) { int size = columnQueue.size(); for (int i = 0; i < size; i++) { HighPriorityColumn column = columnQueue.poll(); + LOG.info("Process column " + column.tblId + "." + column.colName); TableIf table = StatisticsUtil.findTable(column.catalogId, column.dbId, column.tblId); synchronized (jobsMap) { // If job map reach the upper limit, stop putting new jobs. if (!jobsMap.containsKey(table) && jobsMap.size() >= JOB_MAP_SIZE) { + LOG.info("Job map full."); break; } if (jobsMap.containsKey(table)) { jobsMap.get(table).add(column.colName); } else { - jobsMap.put(table, Collections.singleton(column.colName)); + HashSet columns = new HashSet<>(); + columns.add(column.colName); + jobsMap.put(table, columns); } + LOG.info("Column " + column.tblId + "." + column.colName + " added"); } } } - protected void appendToLowQueue(Map> jobsMap) { - + protected void appendToLowQueue(Map> jobsMap) { InternalCatalog catalog = Env.getCurrentInternalCatalog(); List sortedDbs = catalog.getDbIds().stream().sorted().collect(Collectors.toList()); + int batchSize = 100; for (long dbId : sortedDbs) { if (dbId < currentDbId || StatisticConstants.SYSTEM_DBS.contains(catalog.getDbNullable(dbId).getFullName())) { @@ -108,11 +119,11 @@ protected void appendToLowQueue(Map> jobsMap) { List tables = db.get().getTables().stream() .sorted((t1, t2) -> (int) (t1.getId() - t2.getId())).collect(Collectors.toList()); for (Table t : tables) { - if (t.getId() <= currentTableId) { + if (!(t instanceof OlapTable) || t.getId() <= currentTableId) { continue; } synchronized (jobsMap) { - // If job map reach the upper limit, stop putting new jobs. + // If job map reach the upper limit, stop adding new jobs. if (!jobsMap.containsKey(t) && jobsMap.size() >= JOB_MAP_SIZE) { return; } @@ -126,6 +137,9 @@ protected void appendToLowQueue(Map> jobsMap) { } } currentTableId = t.getId(); + if (--batchSize <= 0) { + return; + } } } // All tables have been processed once, reset for the next loop. diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java index 9231c6a2bc7cd1..96ca0aab54c4cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/TableStatsMeta.java @@ -126,11 +126,6 @@ public Set analyzeColumns() { return colNameToColStatsMeta.keySet(); } - public void reset() { - updatedTime = 0; - colNameToColStatsMeta.values().forEach(ColStatsMeta::clear); - } - public void update(AnalysisInfo analyzedJob, TableIf tableIf) { updatedTime = analyzedJob.tblUpdateTime; userInjected = analyzedJob.userInject; @@ -145,34 +140,34 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) { for (String col : cols) { ColStatsMeta colStatsMeta = colNameToColStatsMeta.get(col); if (colStatsMeta == null) { - colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime, - analyzedJob.analysisMethod, analyzedJob.analysisType, analyzedJob.jobType, 0)); + colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime, analyzedJob.analysisMethod, + analyzedJob.analysisType, analyzedJob.jobType, 0, analyzedJob.rowCount, + analyzedJob.updateRows)); } else { colStatsMeta.updatedTime = updatedTime; colStatsMeta.analysisType = analyzedJob.analysisType; colStatsMeta.analysisMethod = analyzedJob.analysisMethod; colStatsMeta.jobType = analyzedJob.jobType; + colStatsMeta.updatedRows = analyzedJob.updateRows; + colStatsMeta.rowCount = analyzedJob.rowCount; } } jobType = analyzedJob.jobType; if (tableIf != null) { if (tableIf instanceof OlapTable) { - rowCount = analyzedJob.emptyJob ? 0 : tableIf.getRowCount(); - } - if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet() - .containsAll(tableIf.getBaseSchema().stream() - .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) - .map(Column::getName).collect(Collectors.toSet()))) { - updatedRows.set(0); - newPartitionLoaded.set(false); - } - if (tableIf instanceof OlapTable) { + rowCount = analyzedJob.rowCount; PartitionInfo partitionInfo = ((OlapTable) tableIf).getPartitionInfo(); - if (partitionInfo != null && analyzedJob.colToPartitions.keySet() + if (analyzedJob.rowCount != 0 && partitionInfo != null && analyzedJob.colToPartitions.keySet() .containsAll(partitionInfo.getPartitionColumns().stream() - .map(Column::getName).collect(Collectors.toSet()))) { + .map(Column::getName).collect(Collectors.toSet()))) { newPartitionLoaded.set(false); } + if (analyzedJob.rowCount != 0 && analyzedJob.colToPartitions.keySet() + .containsAll(tableIf.getBaseSchema().stream() + .filter(c -> !StatisticsUtil.isUnsupportedType(c.getType())) + .map(Column::getName).collect(Collectors.toSet()))) { + userInjected = false; + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 8ee08d57e69a6a..65c2ee9e6da013 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -867,6 +867,16 @@ public static boolean enableAutoAnalyze() { return false; } + public static boolean enableAutoAnalyzeInternalCatalog() { + try { + return findConfigFromGlobalSessionVar( + SessionVariable.ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG).enableAutoAnalyzeInternalCatalog; + } catch (Exception e) { + LOG.warn("Fail to get value of enable auto analyze internal catalog, return false by default", e); + } + return false; + } + public static int getInsertMergeCount() { try { return findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT) diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index 1bf2041bb4f12c..cb2637d5cf685a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -184,7 +184,7 @@ protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exce protected void syncLoadStats() { } }; - job.writeBuf(); + job.flushBuffer(); Assertions.assertEquals(0, job.queryFinished.size()); } @@ -210,7 +210,7 @@ protected void syncLoadStats() { job.buf.add(new ColStatsData()); job.queryFinished = new HashSet<>(); job.queryFinished.add(task2); - job.writeBuf(); + job.flushBuffer(); Assertions.assertEquals(0, job.queryFinished.size()); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java index f8a77fe06db754..4e8bbfe5aff17b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisManagerTest.java @@ -306,7 +306,7 @@ public List getBaseSchema() { Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2)); TableStatsMeta stats3 = new TableStatsMeta(0, new AnalysisInfoBuilder() - .setColToPartitions(new HashMap<>()).setEmptyJob(true).setColName("col1").build(), olapTable); + .setColToPartitions(new HashMap<>()).setRowCount(0).setColName("col1").build(), olapTable); Assertions.assertTrue(olapTable.needReAnalyzeTable(stats3)); }