Skip to content

Commit

Permalink
[improvement](statistics)Async drop stats while truncating table. (ap…
Browse files Browse the repository at this point in the history
…ache#37715)

Drop stats for table with many partitions may slow, because to
invalidate partition stats cache is time consuming. Truncate table
operation do the drop stats synchronously, so the truncate table may be
very slow for partition tables.
This pr is to improvement the performance of truncate table. Do the drop
stats asynchronously.

Time consumed for truncate a table with 10000 partitions and 10 columns
reduced to 2.5s from 10s.
  • Loading branch information
Jibing-Li authored and dataroaring committed Jul 17, 2024
1 parent 4bb7db3 commit ba80a17
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3264,8 +3264,13 @@ public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request
if (target.partitions != null) {
partitionNames = new PartitionNames(false, new ArrayList<>(target.partitions));
}
analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId,
target.columns, tableStats, partitionNames);
if (target.isTruncate) {
analysisManager.submitAsyncDropStatsTask(target.catalogId, target.dbId,
target.tableId, tableStats, partitionNames);
} else {
analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId,
target.columns, tableStats, partitionNames);
}
return new TStatus(TStatusCode.OK);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public class AnalysisManager implements Writable {
private StatisticsCache statisticsCache;

private AnalysisTaskExecutor taskExecutor;
private ThreadPoolExecutor dropStatsExecutors;

// Store task information in metadata.
protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
Expand All @@ -157,6 +158,11 @@ public AnalysisManager() {
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
Integer.MAX_VALUE);
this.statisticsCache = new StatisticsCache();
this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
1, 1, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(10),
new ThreadPoolExecutor.AbortPolicy(),
"Drop stats executor", true);
}
}

Expand Down Expand Up @@ -656,7 +662,7 @@ public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
if (partitionNames != null && !partitionNames.isStar() && partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
invalidateRemoteStats(catalogId, dbId, tblId, cols, partitions);
invalidateRemoteStats(catalogId, dbId, tblId, cols, partitions, false);
StatisticsRepository.dropStatistics(catalogId, dbId, tblId, cols, partitions);
}

Expand All @@ -668,17 +674,51 @@ public void dropStats(TableIf table, PartitionNames partitionNames) throws DdlEx
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
invalidateLocalStats(catalogId, dbId, tableId, null, tableStats, partitionNames);
submitAsyncDropStatsTask(catalogId, dbId, tableId, tableStats, partitionNames);
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar() && partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, null, partitions);
invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true);
StatisticsRepository.dropStatistics(catalogId, dbId, table.getId(), null, partitions);
}

class DropStatsTask implements Runnable {
private final long catalogId;
private final long dbId;
private final long tableId;
private final Set<String> columns;
private final TableStatsMeta tableStats;
private final PartitionNames partitionNames;

public DropStatsTask(long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
this.columns = columns;
this.tableStats = tableStats;
this.partitionNames = partitionNames;
}

@Override
public void run() {
invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames);
}
}

public void submitAsyncDropStatsTask(long catalogId, long dbId, long tableId,
TableStatsMeta tableStats, PartitionNames partitionNames) {
try {
dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId, tableId, null, tableStats, partitionNames));
} catch (Throwable t) {
LOG.info("Failed to drop stats for truncate table {}.{}.{}. Reason:{}",
catalogId, dbId, tableId, t.getMessage());
}
}

public void invalidateLocalStats(long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames) {
if (tableStats == null) {
Expand Down Expand Up @@ -743,8 +783,9 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId, Set<St
}

public void invalidateRemoteStats(long catalogId, long dbId, long tableId,
Set<String> columns, Set<String> partitions) {
InvalidateStatsTarget target = new InvalidateStatsTarget(catalogId, dbId, tableId, columns, partitions);
Set<String> columns, Set<String> partitions, boolean isTruncate) {
InvalidateStatsTarget target = new InvalidateStatsTarget(
catalogId, dbId, tableId, columns, partitions, isTruncate);
TInvalidateFollowerStatsCacheRequest request = new TInvalidateFollowerStatsCacheRequest();
request.key = GsonUtils.GSON.toJson(target);
StatisticsCache statisticsCache = Env.getCurrentEnv().getStatisticsCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,16 @@ public class InvalidateStatsTarget {
@SerializedName("partitions")
public final Set<String> partitions;

public InvalidateStatsTarget(long catalogId, long dbId, long tableId, Set<String> columns, Set<String> partitions) {
@SerializedName("it")
public final boolean isTruncate;

public InvalidateStatsTarget(long catalogId, long dbId, long tableId, Set<String> columns,
Set<String> partitions, boolean isTruncate) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
this.columns = columns;
this.partitions = partitions;
this.isTruncate = isTruncate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public static PartitionColumnStatistic fromResultRow(ResultRow row) {
/ partitionStatisticBuilder.getCount());
String min = row.get(9);
String max = row.get(10);
if (!"NULL".equalsIgnoreCase(min)) {
if (min != null && !"NULL".equalsIgnoreCase(min)) {
try {
partitionStatisticBuilder.setMinValue(StatisticsUtil.convertToDouble(col.getType(), min));
partitionStatisticBuilder.setMinExpr(StatisticsUtil.readableValue(col.getType(), min));
Expand All @@ -148,7 +148,7 @@ public static PartitionColumnStatistic fromResultRow(ResultRow row) {
} else {
partitionStatisticBuilder.setMinValue(Double.NEGATIVE_INFINITY);
}
if (!"NULL".equalsIgnoreCase(max)) {
if (max != null && !"NULL".equalsIgnoreCase(max)) {
try {
partitionStatisticBuilder.setMaxValue(StatisticsUtil.convertToDouble(col.getType(), max));
partitionStatisticBuilder.setMaxExpr(StatisticsUtil.readableValue(col.getType(), max));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

// CHECKSTYLE OFF
public class AnalysisManagerTest {
Expand Down Expand Up @@ -639,4 +640,30 @@ public String getPriority() {
Assertions.assertTrue(job.columns.contains(Pair.of("index1", "col7")));
Assertions.assertEquals(JobPriority.LOW, job.priority);
}

@Test
public void testAsyncDropStats() throws InterruptedException {
AtomicInteger count = new AtomicInteger(0);
new MockUp<AnalysisManager>() {
@Mock
public void invalidateLocalStats(long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames) {
try {
Thread.sleep(1000);
count.incrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
AnalysisManager analysisManager = new AnalysisManager();
for (int i = 0; i < 20; i++) {
System.out.println("Submit " + i);
analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, null);
}
Thread.sleep(25000);
System.out.println(count.get());
Assertions.assertTrue(count.get() > 10);
Assertions.assertTrue(count.get() < 20);
}
}

0 comments on commit ba80a17

Please sign in to comment.