Skip to content

Commit

Permalink
[AMORO-3239] Fix stack overflow caused by reading too many partitions…
Browse files Browse the repository at this point in the history
… in the filter (#3240)

* [AMORO-3239] Fix stack overflow caused by reading too many partitions in the filter

* [AMORO-3239] Add the "ignore-filter-partition-count" parameter

* move parameter "optimizer.ignore-filter-partition-count" to "self-optimizing.skip-filter-partition-count"

* move parameter "self-optimizing.skip-filter-partition-count" to "refresh-tables.max-pending-partition-count"
  • Loading branch information
7hong authored and zhoujinsong committed Oct 30, 2024
1 parent 73c0856 commit e73ca0a
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ public class AmoroManagementConf {
.defaultValue(60000L)
.withDescription("Interval for refreshing table metadata.");

public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
ConfigOptions.key("refresh-tables.max-pending-partition-count")
.intType()
.defaultValue(100)
.withDescription("Filters will not be used beyond that number of partitions");

public static final ConfigOption<Long> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class OptimizingEvaluator {

Expand All @@ -58,14 +59,17 @@ public class OptimizingEvaluator {
protected final MixedTable mixedTable;
protected final TableRuntime tableRuntime;
protected final TableSnapshot currentSnapshot;
protected final int maxPendingPartitions;
protected boolean isInitialized = false;

protected Map<String, PartitionEvaluator> partitionPlanMap = Maps.newHashMap();

public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
public OptimizingEvaluator(
TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
this.tableRuntime = tableRuntime;
this.mixedTable = table;
this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
this.maxPendingPartitions = maxPendingPartitions;
}

public TableRuntime getTableRuntime() {
Expand Down Expand Up @@ -129,7 +133,10 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
mixedTable.id(),
count,
System.currentTimeMillis() - startTime);
partitionPlanMap.values().removeIf(plan -> !plan.isNecessary());
partitionPlanMap = partitionPlanMap.entrySet().stream()
.filter(entry -> entry.getValue().isNecessary())
.limit(maxPendingPartitions)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private Map<String, String> partitionProperties(Pair<Integer, StructLike> partition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public OptimizingPlanner(
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
super(tableRuntime, table);
super(tableRuntime, table, Integer.MAX_VALUE);
this.partitionFilter =
tableRuntime.getPendingInput() == null
? Expressions.alwaysTrue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public void setup(TableManager tableManager, Configurations conf) {
new TableRuntimeRefreshExecutor(
tableManager,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL));
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends BaseTableExecutor {

// 1 minutes
private final long interval;
private final int maxPendingPartitions;

public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize, long interval) {
public TableRuntimeRefreshExecutor(
TableManager tableRuntimes, int poolSize, long interval, int maxPendingPartitions) {
super(tableRuntimes, poolSize);
this.interval = interval;
this.maxPendingPartitions = maxPendingPartitions;
}

@Override
Expand All @@ -48,7 +51,8 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {

private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable table) {
if (tableRuntime.isOptimizingEnabled() && !tableRuntime.getOptimizingStatus().isProcessing()) {
OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime, table);
OptimizingEvaluator evaluator =
new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
if (evaluator.isNecessary()) {
OptimizingEvaluator.PendingInput pendingInput = evaluator.getPendingInput();
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ protected void reboot() throws InterruptedException {
private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {

public TableRuntimeRefresher() {
super(tableService(), 1, Integer.MAX_VALUE);
super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
}

void refreshPending() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testFragmentFiles() {
}

protected OptimizingEvaluator buildOptimizingEvaluator() {
return new OptimizingEvaluator(getTableRuntime(), getMixedTable());
return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
}

protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
Expand Down
1 change: 1 addition & 0 deletions amoro-ams/dist/src/main/amoro-bin/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ ams:
refresh-tables:
thread-count: 10
interval: 60000 # 1min
max-pending-partition-count: 100 # default 100

self-optimizing:
commit-thread-count: 10
Expand Down

0 comments on commit e73ca0a

Please sign in to comment.