Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](stats) fix auto collector always create sample job no matter the table size #26968

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ public enum ScheduleType {

@SerializedName("endTime")
public long endTime;
/**
*
* Used to store the newest partition version of tbl when creating this job.
* This variables would be saved by table stats meta.
*/
public final long tblUpdateTime;
Kikyou1997 marked this conversation as resolved.
Show resolved Hide resolved

public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId, long dbId, long tblId,
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
Expand All @@ -195,7 +201,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn) {
boolean usingSqlForPartitionColumn, long tblUpdateTime) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -230,6 +236,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
}
this.forceFull = forceFull;
this.usingSqlForPartitionColumn = usingSqlForPartitionColumn;
this.tblUpdateTime = tblUpdateTime;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class AnalysisInfoBuilder {
private boolean forceFull;
private boolean usingSqlForPartitionColumn;

private long tblUpdateTime;

public AnalysisInfoBuilder() {
}

Expand Down Expand Up @@ -97,6 +99,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
cronExpression = info.cronExpression;
forceFull = info.forceFull;
usingSqlForPartitionColumn = info.usingSqlForPartitionColumn;
tblUpdateTime = info.tblUpdateTime;
}

public AnalysisInfoBuilder setJobId(long jobId) {
Expand Down Expand Up @@ -254,45 +257,17 @@ public AnalysisInfoBuilder setUsingSqlForPartitionColumn(boolean usingSqlForPart
return this;
}

public AnalysisInfoBuilder setTblUpdateTime(long tblUpdateTime) {
this.tblUpdateTime = tblUpdateTime;
return this;
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForPartitionColumn);
}

public AnalysisInfoBuilder copy() {
return new AnalysisInfoBuilder()
.setJobId(jobId)
.setTaskId(taskId)
.setTaskIds(taskIds)
.setCatalogId(catalogId)
.setDBId(dbId)
.setTblId(tblId)
.setColToPartitions(colToPartitions)
.setColName(colName)
.setIndexId(indexId)
.setJobType(jobType)
.setAnalysisMode(analysisMode)
.setAnalysisMethod(analysisMethod)
.setAnalysisType(analysisType)
.setSamplePercent(samplePercent)
.setSampleRows(sampleRows)
.setPeriodTimeInMs(periodTimeInMs)
.setMaxBucketNum(maxBucketNum)
.setMessage(message)
.setLastExecTimeInMs(lastExecTimeInMs)
.setTimeCostInMs(timeCostInMs)
.setState(state)
.setScheduleType(scheduleType)
.setExternalTableLevelTask(externalTableLevelTask)
.setSamplingPartition(samplingPartition)
.setPartitionOnly(partitionOnly)
.setAllPartition(isAllPartition)
.setPartitionCount(partitionCount)
.setCronExpression(cronExpression)
.setForceFull(forceFull)
.setUsingSqlForPartitionColumn(usingSqlForPartitionColumn);
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
Expand Down Expand Up @@ -505,7 +504,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
partitionNames, analysisType);
infoBuilder.setColToPartitions(colToPartitions);
infoBuilder.setTaskIds(Lists.newArrayList());

infoBuilder.setTblUpdateTime(table.getUpdateTime());
return infoBuilder.build();
}

Expand Down Expand Up @@ -601,9 +600,9 @@ public void updateTableStats(AnalysisInfo jobInfo) {
}
TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
if (tableStats == null) {
updateTableStatsStatus(new TableStatsMeta(tbl.getId(), tbl.estimatedRowCount(), jobInfo));
updateTableStatsStatus(new TableStatsMeta(tbl.estimatedRowCount(), jobInfo, tbl));
} else {
tableStats.updateByJob(jobInfo);
tableStats.update(jobInfo, tbl);
logCreateTableStats(tableStats);
}

Expand Down Expand Up @@ -1005,21 +1004,6 @@ public void registerSysJob(AnalysisInfo jobInfo, Map<Long, BaseAnalysisTask> tas
analysisJobIdToTaskMap.put(jobInfo.jobId, taskInfos);
}

@VisibleForTesting
protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
TableStatsMeta tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
}
return table.getPartitionNames().stream()
.map(table::getPartition)
.filter(Partition::hasData)
.filter(partition ->
partition.getVisibleVersionTime() >= tableStats.updatedTime).map(Partition::getName)
.collect(Collectors.toSet());
}

protected void logAutoJob(AnalysisInfo autoJob) {
Env.getCurrentEnv().getEditLog().logAutoJob(autoJob);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private void getTableStats() throws Exception {
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(
new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
new TableStatsMeta(Long.parseLong(rowCount), info, tbl));
job.rowCountDone(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private void getTableStats() throws Exception {
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE));
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
.updateTableStatsStatus(new TableStatsMeta(Long.parseLong(rowCount), info, table));
job.rowCountDone(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,15 @@ protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
.setAnalysisMethod(analysisMethod)
.setSampleRows(StatisticsUtil.getHugeTableSampleRows())
.setSampleRows(analysisMethod.equals(AnalysisMethod.SAMPLE)
? StatisticsUtil.getHugeTableSampleRows() : -1)
.setScheduleType(ScheduleType.AUTOMATIC)
.setState(AnalysisState.PENDING)
.setTaskIds(new ArrayList<>())
.setLastExecTimeInMs(System.currentTimeMillis())
.setJobType(JobType.SYSTEM).build();
.setJobType(JobType.SYSTEM)
.setTblUpdateTime(table.getUpdateTime())
.build();
analysisInfos.add(jobInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
Expand Down Expand Up @@ -64,11 +66,11 @@ public class TableStatsMeta implements Writable {

// It's necessary to store these fields separately from AnalysisInfo, since the lifecycle between AnalysisInfo
// and TableStats is quite different.
public TableStatsMeta(long tblId, long rowCount, AnalysisInfo analyzedJob) {
this.tblId = tblId;
public TableStatsMeta(long rowCount, AnalysisInfo analyzedJob, TableIf table) {
this.tblId = table.getId();
this.idxId = -1;
this.rowCount = rowCount;
updateByJob(analyzedJob);
update(analyzedJob, table);
}

@Override
Expand Down Expand Up @@ -112,8 +114,8 @@ public void reset() {
colNameToColStatsMeta.values().forEach(ColStatsMeta::clear);
}

public void updateByJob(AnalysisInfo analyzedJob) {
updatedTime = System.currentTimeMillis();
public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
updatedTime = analyzedJob.tblUpdateTime;
String colNameStr = analyzedJob.colName;
// colName field AnalyzeJob's format likes: "[col1, col2]", we need to remove brackets here
// TODO: Refactor this later
Expand All @@ -133,5 +135,10 @@ public void updateByJob(AnalysisInfo analyzedJob) {
}
}
jobType = analyzedJob.jobType;
if (tableIf != null && analyzedJob.colToPartitions.keySet()
.containsAll(tableIf.getBaseSchema().stream().map(Column::getName).collect(
Collectors.toSet()))) {
updatedRows.set(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -328,23 +328,34 @@ public void testReAnalyze() {

int count = 0;
int[] rowCount = new int[]{100, 200};

final Column c = new Column("col1", PrimitiveType.INT);
@Mock
public long getRowCount() {
return rowCount[count++];
}

@Mock
public List<Column> getBaseSchema() {
return Lists.newArrayList(new Column("col1", PrimitiveType.INT));
return Lists.newArrayList(c);
}

@Mock
public List<Column> getColumns() {
return Lists.newArrayList(c);
}

};
OlapTable olapTable = new OlapTable();
TableStatsMeta stats1 = new TableStatsMeta(0, 50, new AnalysisInfoBuilder().setColName("col1").build());
TableStatsMeta stats1 = new TableStatsMeta(
50, new AnalysisInfoBuilder().setColToPartitions(new HashMap<>())
.setColName("col1").build(), olapTable);
stats1.updatedRows.addAndGet(50);

Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1));
TableStatsMeta stats2 = new TableStatsMeta(0, 190, new AnalysisInfoBuilder().setColName("col1").build());
TableStatsMeta stats2 = new TableStatsMeta(
190, new AnalysisInfoBuilder()
.setColToPartitions(new HashMap<>()).setColName("col1").build(), olapTable);
stats2.updatedRows.addAndGet(20);
Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2));

Expand Down
Loading