Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3239] Fix stack overflow caused by reading too many partitions in the filter #3240

Merged
merged 12 commits into from
Oct 16, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@ public class AmoroManagementConf {
.defaultValue(3000L)
.withDescription("Optimizer polling task timeout.");

public static final ConfigOption<Integer> OPTIMIZER_IGNORE_FILTER_PARTITIONS =
ConfigOptions.key("optimizer.ignore-filter-partition-count")
.intType()
.defaultValue(100)
.withDescription("Filters will not be used beyond that number of partitions");

/** config key prefix of terminal */
public static final String TERMINAL_PREFIX = "terminal.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,15 @@ public class DefaultOptimizingService extends StatedPersistentBase
private final TableService tableService;
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;
private final int ignoreFilterPartitions;

public DefaultOptimizingService(Configurations serviceConfig, DefaultTableService tableService) {
this.optimizerTouchTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.ignoreFilterPartitions =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_IGNORE_FILTER_PARTITIONS);
this.pollingTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.tableService = tableService;
this.tableHandlerChain = new TableRuntimeHandlerImpl();
Expand Down Expand Up @@ -139,7 +142,8 @@ private void loadOptimizingQueues(List<TableRuntime> tableRuntimeMetaList) {
this,
planExecutor,
Optional.ofNullable(tableRuntimes).orElseGet(ArrayList::new),
maxPlanningParallelism);
maxPlanningParallelism,
ignoreFilterPartitions);
optimizingQueueByGroup.put(groupName, optimizingQueue);
});
optimizers.forEach(optimizer -> registerOptimizer(optimizer, false));
Expand Down Expand Up @@ -314,7 +318,8 @@ public void createResourceGroup(ResourceGroup resourceGroup) {
this,
planExecutor,
new ArrayList<>(),
maxPlanningParallelism);
maxPlanningParallelism,
ignoreFilterPartitions);
optimizingQueueByGroup.put(resourceGroup.getName(), optimizingQueue);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class OptimizingQueue extends PersistentBase {
private final Lock scheduleLock = new ReentrantLock();
private final Condition planningCompleted = scheduleLock.newCondition();
private final int maxPlanningParallelism;
private final int ignoreFilterPartitionCount;
private final OptimizerGroupMetrics metrics;
private ResourceGroup optimizerGroup;

Expand All @@ -92,14 +93,16 @@ public OptimizingQueue(
QuotaProvider quotaProvider,
Executor planExecutor,
List<TableRuntime> tableRuntimeList,
int maxPlanningParallelism) {
int maxPlanningParallelism,
int ignoreFilterPartitionCount) {
Preconditions.checkNotNull(optimizerGroup, "Optimizer group can not be null");
this.planExecutor = planExecutor;
this.optimizerGroup = optimizerGroup;
this.quotaProvider = quotaProvider;
this.scheduler = new SchedulingPolicy(optimizerGroup);
this.tableManager = tableManager;
this.maxPlanningParallelism = maxPlanningParallelism;
this.ignoreFilterPartitionCount = ignoreFilterPartitionCount;
this.metrics =
new OptimizerGroupMetrics(
optimizerGroup.getName(), MetricManager.getInstance().getGlobalRegistry(), this);
Expand Down Expand Up @@ -267,7 +270,8 @@ private TableOptimizingProcess planInternal(TableRuntime tableRuntime) {
tableRuntime.refresh(table),
(MixedTable) table.originalTable(),
getAvailableCore(),
maxInputSizePerThread());
maxInputSizePerThread(),
ignoreFilterPartitionCount);
if (planner.isNecessary()) {
return new TableOptimizingProcess(planner);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class OptimizingPlanner extends OptimizingEvaluator {
Expand All @@ -64,10 +65,15 @@ public OptimizingPlanner(
TableRuntime tableRuntime,
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
long maxInputSizePerThread,
int ignoreFilterPartitionCount) {
super(tableRuntime, table);
this.partitionFilter =
tableRuntime.getPendingInput() == null
(tableRuntime.getPendingInput() == null
|| tableRuntime.getPendingInput().getPartitions().values().stream()
.mapToInt(Set::size)
.sum()
> ignoreFilterPartitionCount)
? Expressions.alwaysTrue()
: tableRuntime.getPendingInput().getPartitions().entrySet().stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ protected OptimizingQueue buildOptimizingGroupService(TableRuntime tableRuntime)
quotaProvider,
planExecutor,
Collections.singletonList(tableRuntime),
1);
1,
100);
}

private OptimizingQueue buildOptimizingGroupService() {
Expand All @@ -122,7 +123,8 @@ private OptimizingQueue buildOptimizingGroupService() {
quotaProvider,
planExecutor,
Collections.emptyList(),
1);
1,
100);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ private OptimizingPlanner planner() {
tableRuntime,
table,
availableCore,
OptimizerProperties.MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT);
OptimizerProperties.MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT,
100);
}

private OptimizingConfig optimizingConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ protected OptimizingPlanner buildOptimizingEvaluator() {
getTableRuntime(),
getMixedTable(),
1,
OptimizerProperties.MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT);
OptimizerProperties.MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT,
100);
}
}
1 change: 1 addition & 0 deletions dist/src/main/amoro-bin/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ ams:
task-ack-timeout: 30000 # 30s
polling-timeout: 3000 # 3s
max-planning-parallelism: 1 # default 1
ignore-filter-partition-count: 100 # default 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this param is not suitable for this place.
It's more suitable for self-optimizing groups.

And I think more proper meaning is 'self-optimizing.skip-evaluating-for-partition-count', WDYT @zhoujinsong

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's appropriate to put it in self-optimizing groups.

But I think skip-evaluating is inappropriate, because this parameter only controls whether to use the filter, not skipping the Evaluator.

Copy link
Contributor

@majin1102 majin1102 Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's correct. I think skip is inappropriate too.

Originally there are no evaluating and filtering. But it is proved that directly planning would cause OOM because all partition files would be cached in memory, which leads to design a evaluating phase before planning(stream planning and store a partition set to avoid memory usage in planning phase). If we do not filter here, that means evaluating is not used or skipped.

Evaluating is a sensible concept and even be revealed on dashboard in the future, however filter is quite a detailed implementation and could be pointed to anything evolved or nothing. For example, partition filter is a fast implementation for evaluting, which has serveral drawbacks fed back:

  1. can not filter anything for non-partition table
  2. too large stores in sysdb
  3. the issue you have encountered

From my view, this PR has done a temporary optimization and the evaluating logic should evolve to resolve issues above, and I'm concerned this parameter pointed to filter is easy to be outdated(some work related to work I am pushing #2596).

How do you think using a set to filter to resolve the issue directly? Or you could maintain this evolation when related issues are raised. That would help a lot


blocker:
timeout: 60000 # 1min
Expand Down
Loading