Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class ShowAnalyzeStmt extends ShowStmt {
.add("schedule_type")
.add("start_time")
.add("end_time")
.add("priority")
.build();

private long jobId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,7 @@ public void setEnableLeftZigZag(boolean enableLeftZigZag) {
@VariableMgr.VarAttr(name = ENABLE_AUTO_ANALYZE_INTERNAL_CATALOG,
description = {"临时参数,收否自动收集所有内表", "Temp variable, enable to auto collect all OlapTable."},
flag = VariableMgr.GLOBAL)
public boolean enableAutoAnalyzeInternalCatalog = false;
public boolean enableAutoAnalyzeInternalCatalog = true;

@VariableMgr.VarAttr(name = AUTO_ANALYZE_TABLE_WIDTH_THRESHOLD,
description = {"参与自动收集的最大表宽度,列数多于这个参数的表不参与自动收集",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2859,6 +2859,7 @@ private void handleShowAnalyze() {
java.time.ZoneId.systemDefault());
row.add(startTime.format(formatter));
row.add(endTime.format(formatter));
row.add(analysisInfo.priority.name());
resultRows.add(row);
} catch (Exception e) {
LOG.warn("Failed to get analyze info for table {}.{}.{}, reason: {}",
Expand All @@ -2876,8 +2877,7 @@ private void handleShowAutoAnalyzePendingJobs() {
for (AutoAnalysisPendingJob job : jobs) {
try {
List<String> row = new ArrayList<>();
CatalogIf<? extends DatabaseIf<? extends TableIf>> c
= StatisticsUtil.findCatalog(job.catalogName);
CatalogIf<? extends DatabaseIf<? extends TableIf>> c = StatisticsUtil.findCatalog(job.catalogName);
row.add(c.getName());
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(job.dbName);
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,12 @@ public enum ScheduleType {
*/
public final long tblUpdateTime;

@SerializedName("userInject")
public final boolean userInject;

@SerializedName("priority")
public final JobPriority priority;

public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId, long dbId, long tblId,
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
Expand All @@ -210,7 +214,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn, long tblUpdateTime, long rowCount, boolean userInject,
long updateRows) {
long updateRows, JobPriority priority) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -249,6 +253,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
this.rowCount = rowCount;
this.userInject = userInject;
this.updateRows = updateRows;
this.priority = priority;
}

@Override
Expand Down Expand Up @@ -293,6 +298,7 @@ public String toString() {
sj.add("rowCount: " + rowCount);
sj.add("userInject: " + userInject);
sj.add("updateRows: " + updateRows);
sj.add("priority: " + priority.name());
return sj.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class AnalysisInfoBuilder {
private long rowCount;
private boolean userInject;
private long updateRows;
private JobPriority priority;

public AnalysisInfoBuilder() {
}
Expand Down Expand Up @@ -105,6 +106,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
rowCount = info.rowCount;
userInject = info.userInject;
updateRows = info.updateRows;
priority = info.priority;
}

public AnalysisInfoBuilder setJobId(long jobId) {
Expand Down Expand Up @@ -282,12 +284,18 @@ public AnalysisInfoBuilder setUpdateRows(long updateRows) {
return this;
}

public AnalysisInfoBuilder setPriority(JobPriority priority) {
this.priority = priority;
return this;
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, rowCount, userInject, updateRows);
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, rowCount, userInject, updateRows,
priority);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ public List<AnalysisInfo> buildAnalysisInfosForDB(DatabaseIf<TableIf> db, Analyz
public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException {
// Using auto analyzer if user specifies.
if (stmt.getAnalyzeProperties().getProperties().containsKey("use.auto.analyzer")) {
Env.getCurrentEnv().getStatisticsAutoCollector().processOneJob(stmt.getTable(), stmt.getColumnNames());
Env.getCurrentEnv().getStatisticsAutoCollector()
.processOneJob(stmt.getTable(), stmt.getColumnNames(), JobPriority.HIGH);
return;
}
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
Expand Down Expand Up @@ -422,6 +423,7 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
infoBuilder.setRowCount(rowCount);
TableStatsMeta tableStatsStatus = findTableStatsStatus(table.getId());
infoBuilder.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get());
infoBuilder.setPriority(JobPriority.MANUAL);
return infoBuilder.build();
}

Expand Down Expand Up @@ -1230,12 +1232,14 @@ protected void updateColumn(Collection<Slot> slotReferences, Queue<HighPriorityC
public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns,
Collection<TQueryColumn> midColumns) {
for (TQueryColumn c : highColumns) {
if (!highPriorityColumns.offer(new HighPriorityColumn(c.catalogId, c.dbId, c.tblId, c.colName))) {
if (!highPriorityColumns.offer(new HighPriorityColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
Long.parseLong(c.tblId), c.colName))) {
break;
}
}
for (TQueryColumn c : midColumns) {
if (!midPriorityColumns.offer(new HighPriorityColumn(c.catalogId, c.dbId, c.tblId, c.colName))) {
if (!midPriorityColumns.offer(new HighPriorityColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
Long.parseLong(c.tblId), c.colName))) {
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public class FollowerColumnSender extends MasterDaemon {

private static final Logger LOG = LogManager.getLogger(FollowerColumnSender.class);

public static final long INTERVAL = 5000;
public static final long INTERVAL = 60000;

public FollowerColumnSender() {
super("Follower Column Sender", INTERVAL);
Expand Down Expand Up @@ -68,21 +70,28 @@ protected void send() {
if (analysisManager.highPriorityColumns.isEmpty() && analysisManager.midPriorityColumns.isEmpty()) {
return;
}
List<TQueryColumn> highPriorityColumns
Set<TQueryColumn> highPriorityColumns
= analysisManager.highPriorityColumns
.stream()
.filter(c -> StatisticsUtil.needAnalyzeColumn(c))
.map(HighPriorityColumn::toThrift)
.collect(Collectors.toList());
List<TQueryColumn> midPriorityColumns
.collect(Collectors.toSet());
Set<TQueryColumn> midPriorityColumns
= analysisManager.midPriorityColumns
.stream()
.filter(c -> StatisticsUtil.needAnalyzeColumn(c))
.filter(c -> !highPriorityColumns.contains(c))
.map(HighPriorityColumn::toThrift)
.collect(Collectors.toList());
.collect(Collectors.toSet());
analysisManager.highPriorityColumns.clear();
analysisManager.midPriorityColumns.clear();
TSyncQueryColumns queryColumns = new TSyncQueryColumns();
queryColumns.highPriorityColumns = highPriorityColumns;
queryColumns.midPriorityColumns = midPriorityColumns;
List<TQueryColumn> highs = new ArrayList<>();
highs.addAll(highPriorityColumns);
queryColumns.highPriorityColumns = highs;
List<TQueryColumn> mids = new ArrayList<>();
mids.addAll(midPriorityColumns);
queryColumns.midPriorityColumns = mids;
Frontend master = null;
try {
InetSocketAddress masterAddress = currentEnv.getHaProtocol().getLeader();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ public boolean equals(Object other) {

public TQueryColumn toThrift() {
TQueryColumn tQueryColumn = new TQueryColumn();
tQueryColumn.catalogId = catalogId;
tQueryColumn.dbId = dbId;
tQueryColumn.tblId = tblId;
tQueryColumn.catalogId = String.valueOf(catalogId);
tQueryColumn.dbId = String.valueOf(dbId);
tQueryColumn.tblId = String.valueOf(tblId);
tQueryColumn.colName = colName;
return tQueryColumn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@
public enum JobPriority {
HIGH,
MID,
LOW;
LOW,
MANUAL;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
Expand All @@ -39,6 +39,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -58,29 +59,22 @@ public StatisticsAutoCollector() {
@Override
protected void collect() {
while (canCollect()) {
Map.Entry<TableName, Set<String>> job = getJob();
Pair<Entry<TableName, Set<String>>, JobPriority> job = getJob();
if (job == null) {
// No more job to process, break and sleep.
break;
}
try {
TableName tblName = job.getKey();
TableName tblName = job.first.getKey();
TableIf table = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(), tblName.getTbl());
if (!supportAutoAnalyze(table)) {
continue;
}
Set<String> columns = job.getValue()
.stream()
.filter(c -> {
boolean needAnalyzeColumn = needAnalyzeColumn(table, c);
LOG.info("Need analyze column " + c + " ? " + needAnalyzeColumn);
return needAnalyzeColumn;
})
.collect(Collectors.toSet());
processOneJob(table, columns);
Set<String> columns = job.first.getValue().stream().collect(Collectors.toSet());
processOneJob(table, columns, job.second);
} catch (Exception e) {
LOG.warn("Failed to analyze table {} with columns [{}]",
job.getKey().getTbl(), job.getValue().stream().collect(Collectors.joining(",")), e);
LOG.warn("Failed to analyze table {} with columns [{}]", job.first.getKey().getTbl(),
job.first.getValue().stream().collect(Collectors.joining(",")), e);
}
}
}
Expand All @@ -90,18 +84,18 @@ protected boolean canCollect() {
&& StatisticsUtil.inAnalyzeTime(LocalTime.now(TimeUtils.getTimeZone().toZoneId()));
}

protected Map.Entry<TableName, Set<String>> getJob() {
protected Pair<Entry<TableName, Set<String>>, JobPriority> getJob() {
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
Optional<Map.Entry<TableName, Set<String>>> job = fetchJobFromMap(manager.highPriorityJobs);
Optional<Entry<TableName, Set<String>>> job = fetchJobFromMap(manager.highPriorityJobs);
if (job.isPresent()) {
return job.get();
return Pair.of(job.get(), JobPriority.HIGH);
}
job = fetchJobFromMap(manager.midPriorityJobs);
if (job.isPresent()) {
return job.get();
return Pair.of(job.get(), JobPriority.MID);
}
job = fetchJobFromMap(manager.lowPriorityJobs);
return job.isPresent() ? job.get() : null;
return job.isPresent() ? Pair.of(job.get(), JobPriority.LOW) : null;
}

protected Optional<Map.Entry<TableName, Set<String>>> fetchJobFromMap(Map<TableName, Set<String>> jobMap) {
Expand All @@ -112,12 +106,12 @@ protected Optional<Map.Entry<TableName, Set<String>>> fetchJobFromMap(Map<TableN
}
}

protected void processOneJob(TableIf table, Set<String> columns) throws DdlException {
protected void processOneJob(TableIf table, Set<String> columns, JobPriority priority) throws DdlException {
appendPartitionColumns(table, columns);
if (columns.isEmpty()) {
return;
}
AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns);
AnalysisInfo analyzeJob = createAnalyzeJobForTbl(table, columns, priority);
LOG.info("Analyze job : {}", analyzeJob.toString());
createSystemAnalysisJob(analyzeJob);
}
Expand All @@ -134,69 +128,6 @@ protected void appendPartitionColumns(TableIf table, Set<String> columns) {
}
}

// TODO: Need refactor, hard to understand now.
protected boolean needAnalyzeColumn(TableIf table, String column) {
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
TableStatsMeta tableStatsStatus = manager.findTableStatsStatus(table.getId());
if (tableStatsStatus == null) {
return true;
}
if (tableStatsStatus.userInjected) {
return false;
}
ColStatsMeta columnStatsMeta = tableStatsStatus.findColumnStatsMeta(column);
if (columnStatsMeta == null) {
return true;
}
if (table instanceof OlapTable) {
long currentUpdatedRows = tableStatsStatus.updatedRows.get();
long lastAnalyzeUpdateRows = columnStatsMeta.updatedRows;
if (lastAnalyzeUpdateRows == 0 && currentUpdatedRows > 0) {
return true;
}
if (lastAnalyzeUpdateRows > currentUpdatedRows) {
// Shouldn't happen. Just in case.
return true;
}
OlapTable olapTable = (OlapTable) table;
long currentRowCount = olapTable.getRowCount();
long lastAnalyzeRowCount = columnStatsMeta.rowCount;
if (tableStatsStatus.newPartitionLoaded.get() && olapTable.isPartitionColumn(column)) {
return true;
}
if (lastAnalyzeRowCount == 0 && currentRowCount > 0) {
return true;
}
if (currentUpdatedRows == lastAnalyzeUpdateRows) {
return false;
}
double healthValue = ((double) (currentUpdatedRows - lastAnalyzeUpdateRows)
/ (double) currentUpdatedRows) * 100.0;
LOG.info("Column " + column + " update rows health value is " + healthValue);
if (healthValue < StatisticsUtil.getTableStatsHealthThreshold()) {
return true;
}
if (currentRowCount == 0 && lastAnalyzeRowCount != 0) {
return true;
}
if (currentRowCount == 0 && lastAnalyzeRowCount == 0) {
return false;
}
healthValue = ((double) (currentRowCount - lastAnalyzeRowCount) / (double) currentRowCount) * 100.0;
return healthValue < StatisticsUtil.getTableStatsHealthThreshold();
} else {
if (!(table instanceof HMSExternalTable)) {
return false;
}
HMSExternalTable hmsTable = (HMSExternalTable) table;
if (!hmsTable.getDlaType().equals(DLAType.HIVE)) {
return false;
}
return System.currentTimeMillis()
- tableStatsStatus.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
}
}

protected boolean supportAutoAnalyze(TableIf tableIf) {
if (tableIf == null) {
return false;
Expand All @@ -206,7 +137,7 @@ protected boolean supportAutoAnalyze(TableIf tableIf) {
&& ((HMSExternalTable) tableIf).getDlaType().equals(HMSExternalTable.DLAType.HIVE);
}

protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String> columns) {
protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String> columns, JobPriority priority) {
AnalysisMethod analysisMethod = table.getDataSize(true) >= StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
AnalysisManager manager = Env.getServingEnv().getAnalysisManager();
Expand Down Expand Up @@ -236,6 +167,7 @@ protected AnalysisInfo createAnalyzeJobForTbl(TableIf table, Set<String> columns
.setRowCount(rowCount)
.setUpdateRows(tableStatsStatus == null ? 0 : tableStatsStatus.updatedRows.get())
.setColToPartitions(colToPartitions)
.setPriority(priority)
.build();
}
}
Loading