Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1602,7 +1602,7 @@ public class Config extends ConfigBase {
"This parameter controls the time interval for automatic collection jobs to check the health of table"
+ "statistics and trigger automatic collection"
})
public static int auto_check_statistics_in_minutes = 5;
public static int auto_check_statistics_in_minutes = 1;

/**
* If set to TRUE, the compaction slower replica will be skipped when select get queryable replicas
Expand Down
53 changes: 0 additions & 53 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.HistogramTask;
import org.apache.doris.statistics.OlapAnalysisTask;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TColumn;
Expand Down Expand Up @@ -1247,57 +1245,6 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
}
}

public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
if (tblStats == null) {
return true;
}
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName)
.collect(Collectors.toSet()))) {
return true;
}
long rowCount = getRowCount();
if (rowCount > 0 && tblStats.rowCount == 0) {
return true;
}
long updateRows = tblStats.updatedRows.get();
int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows);
return tblHealth < StatisticsUtil.getTableStatsHealthThreshold();
}

@Override
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId());
Set<String> allPartitions = getPartitionNames().stream().map(this::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
if (tableStats == null) {
Map<String, Set<String>> ret = Maps.newHashMap();
for (Column col : getSchemaAllIndexes(false)) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
ret.put(col.getName(), allPartitions);
}
return ret;
}
Map<String, Set<String>> colToPart = new HashMap<>();
for (Column col : getSchemaAllIndexes(false)) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName());
Set<String> partitions = getPartitionNames().stream()
.map(this::getPartition)
.filter(Partition::hasData)
.filter(partition -> partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName)
.collect(Collectors.toSet());
colToPart.put(col.getName(), partitions);
}
return colToPart;
}

@Override
public long fetchRowCount() {
long rowCount = 0;
Expand Down
11 changes: 0 additions & 11 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -641,16 +640,6 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {

public void analyze(String dbName) {}

@Override
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
return true;
}

@Override
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
return Collections.emptyMap();
}

@Override
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -182,10 +181,6 @@ default long getRowCountForNereids() {

Optional<ColumnStatistic> getColumnStatistic(String colName);

boolean needReAnalyzeTable(TableStatsMeta tblStats);

Map<String, Set<String>> findReAnalyzeNeededPartitions();

// Get all the chunk sizes of this table. Now, only HMS external table implemented this interface.
// For HMS external table, the return result is a list of all the files' size.
List<Long> getChunkSizes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;

import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import org.apache.commons.lang3.NotImplementedException;
Expand All @@ -46,12 +43,9 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* External table represent tables that are not self-managed by Doris.
Expand Down Expand Up @@ -317,31 +311,6 @@ public void gsonPostProcess() throws IOException {
objectCreated = false;
}

@Override
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
if (tblStats == null) {
return true;
}
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.map(Column::getName)
.collect(Collectors.toSet()))) {
return true;
}
return System.currentTimeMillis()
- tblStats.updatedTime > StatisticsUtil.getExternalTableAutoAnalyzeIntervalInMillis();
}

@Override
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
HashSet<String> partitions = Sets.newHashSet();
// TODO: Find a way to collect external table partitions that need to be analyzed.
partitions.add("Dummy Partition");
return getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
.collect(Collectors.toMap(Column::getName, k -> partitions));
}

@Override
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public class AnalysisManager implements Writable {
private static final Logger LOG = LogManager.getLogger(AnalysisManager.class);

private static final int COLUMN_QUEUE_SIZE = 1000;
public final Queue<HighPriorityColumn> highPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
public final Queue<HighPriorityColumn> midPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
public final Queue<QueryColumn> highPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
public final Queue<QueryColumn> midPriorityColumns = new ArrayBlockingQueue<>(COLUMN_QUEUE_SIZE);
public final Map<TableName, Set<String>> highPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<String>> midPriorityJobs = new LinkedHashMap<>();
public final Map<TableName, Set<String>> lowPriorityJobs = new LinkedHashMap<>();
Expand Down Expand Up @@ -307,55 +307,10 @@ private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
}
}

/**
* Gets the partitions for which statistics are to be collected. First verify that
* there are partitions that have been deleted but have historical statistics(invalid statistics),
* if there are these partitions, we need to delete them to avoid errors in summary table level statistics.
* Then get the partitions for which statistics need to be collected based on collection mode (incremental/full).
* <p>
* note:
* If there is no invalid statistics, it does not need to collect/update
* statistics if the following conditions are met:
* - in full collection mode, the partitioned table does not have partitions
* - in incremental collection mode, partition statistics already exist
* <p>
* TODO Supports incremental collection of statistics from materialized views
*/
private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames,
Set<String> partitionNames, AnalysisType analysisType) throws DdlException {

Map<String, Set<String>> columnToPartitions = columnNames.stream()
.collect(Collectors.toMap(
columnName -> columnName,
columnName -> new HashSet<>(partitionNames == null ? Collections.emptySet() : partitionNames)
));

if (analysisType == AnalysisType.HISTOGRAM) {
// Collecting histograms does not need to support incremental collection,
// and will automatically cover historical statistics
return columnToPartitions;
}

if (table instanceof HMSExternalTable) {
// TODO Currently, we do not support INCREMENTAL collection for external table.
// One reason is external table partition id couldn't convert to a Long value.
// Will solve this problem later.
return columnToPartitions;
}

if (analysisType == AnalysisType.FUNDAMENTALS) {
Map<String, Set<String>> result = table.findReAnalyzeNeededPartitions();
result.keySet().retainAll(columnNames);
return result;
}

return columnToPartitions;
}

// Make sure colName of job has all the column as this AnalyzeStmt specified, no matter whether it will be analyzed
// or not.
@VisibleForTesting
public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlException {
public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) {
AnalysisInfoBuilder infoBuilder = new AnalysisInfoBuilder();
long jobId = Env.getCurrentEnv().getNextId();
TableIf table = stmt.getTable();
Expand Down Expand Up @@ -413,9 +368,10 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio

long periodTimeInMs = stmt.getPeriodTimeInMs();
infoBuilder.setPeriodTimeInMs(periodTimeInMs);

Map<String, Set<String>> colToPartitions = validateAndGetPartitions(table, columnNames,
partitionNames, analysisType);
Map<String, Set<String>> colToPartitions = new HashMap<>();
Set<String> dummyPartition = new HashSet<>();
dummyPartition.add("dummy partition");
columnNames.stream().forEach(c -> colToPartitions.put(c, dummyPartition));
infoBuilder.setColToPartitions(colToPartitions);
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
Expand Down Expand Up @@ -770,6 +726,7 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId,
}
tableStats.updatedTime = 0;
tableStats.userInjected = false;
tableStats.rowCount = table.getRowCount();
}

public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
Expand Down Expand Up @@ -1196,16 +1153,14 @@ public boolean canSample(TableIf table) {


public void updateColumnUsedInPredicate(Set<Slot> slotReferences) {
LOG.info("Add slots to high priority queues.");
updateColumn(slotReferences, highPriorityColumns);
}

public void updateQueriedColumn(Collection<Slot> slotReferences) {
LOG.info("Add slots to mid priority queues.");
updateColumn(slotReferences, midPriorityColumns);
}

protected void updateColumn(Collection<Slot> slotReferences, Queue<HighPriorityColumn> queue) {
protected void updateColumn(Collection<Slot> slotReferences, Queue<QueryColumn> queue) {
for (Slot s : slotReferences) {
if (!(s instanceof SlotReference)) {
return;
Expand All @@ -1219,10 +1174,12 @@ protected void updateColumn(Collection<Slot> slotReferences, Queue<HighPriorityC
if (database != null) {
CatalogIf catalog = database.getCatalog();
if (catalog != null) {
queue.offer(new HighPriorityColumn(catalog.getId(), database.getId(),
queue.offer(new QueryColumn(catalog.getId(), database.getId(),
table.getId(), optionalColumn.get().getName()));
LOG.info("Offer column " + table.getName() + "(" + table.getId() + ")."
+ optionalColumn.get().getName());
if (LOG.isDebugEnabled()) {
LOG.debug("Offer column " + table.getName() + "(" + table.getId() + ")."
+ optionalColumn.get().getName());
}
}
}
}
Expand All @@ -1231,14 +1188,15 @@ protected void updateColumn(Collection<Slot> slotReferences, Queue<HighPriorityC

public void mergeFollowerQueryColumns(Collection<TQueryColumn> highColumns,
Collection<TQueryColumn> midColumns) {
LOG.info("Received {} high columns and {} mid columns", highColumns.size(), midColumns.size());
for (TQueryColumn c : highColumns) {
if (!highPriorityColumns.offer(new HighPriorityColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
if (!highPriorityColumns.offer(new QueryColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
Long.parseLong(c.tblId), c.colName))) {
break;
}
}
for (TQueryColumn c : midColumns) {
if (!midPriorityColumns.offer(new HighPriorityColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
if (!midPriorityColumns.offer(new QueryColumn(Long.parseLong(c.catalogId), Long.parseLong(c.dbId),
Long.parseLong(c.tblId), c.colName))) {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class BaseAnalysisTask {
public static final long LIMIT_SIZE = 1024 * 1024 * 1024; // 1GB
public static final double LIMIT_FACTOR = 1.2;

protected static final String COLLECT_COL_STATISTICS =
protected static final String FULL_ANALYZE_TEMPLATE =
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS `id`, "
+ " ${catalogId} AS `catalog_id`, "
+ " ${dbId} AS `db_id`, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ protected void setTable(ExternalTable table) {
*/
private void getTableStats() {
Map<String, String> params = buildStatsParams(null);
Pair<Double, Long> sampleInfo = getSampleInfo();
params.put("scaleFactor", String.valueOf(sampleInfo.first));
List<ResultRow> columnResult =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(ANALYZE_TABLE_COUNT_TEMPLATE));
Expand All @@ -98,7 +100,7 @@ protected void getColumnStats() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Will do full collection for column {}", col.getName());
}
sb.append(COLLECT_COL_STATISTICS);
sb.append(FULL_ANALYZE_TEMPLATE);
} else {
// Do sample analyze
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ protected void send() {
= analysisManager.highPriorityColumns
.stream()
.filter(c -> StatisticsUtil.needAnalyzeColumn(c))
.map(HighPriorityColumn::toThrift)
.map(QueryColumn::toThrift)
.collect(Collectors.toSet());
Set<TQueryColumn> midPriorityColumns
= analysisManager.midPriorityColumns
.stream()
.filter(c -> StatisticsUtil.needAnalyzeColumn(c))
.filter(c -> !highPriorityColumns.contains(c))
.map(HighPriorityColumn::toThrift)
.map(QueryColumn::toThrift)
.collect(Collectors.toSet());
analysisManager.highPriorityColumns.clear();
analysisManager.midPriorityColumns.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ protected void doFull() throws Exception {
params.put("tblName", String.valueOf(tbl.getName()));
params.put("index", getIndex());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String collectColStats = stringSubstitutor.replace(COLLECT_COL_STATISTICS);
String collectColStats = stringSubstitutor.replace(FULL_ANALYZE_TEMPLATE);
runQuery(collectColStats);
}

Expand Down
Loading