Skip to content

Commit

Permalink
Limit max pool size for dynamic parallel execution
Browse files Browse the repository at this point in the history
This is a followup of #3044 for the `dynamic` strategy.

Fixes: #3205
  • Loading branch information
mpkorstanje committed Mar 24, 2023
1 parent ee29268 commit 64cd26b
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,20 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter
int parallelism = Math.max(1,
factor.multiply(BigDecimal.valueOf(Runtime.getRuntime().availableProcessors())).intValue());

return new DefaultParallelExecutionConfiguration(parallelism, parallelism, 256 + parallelism, parallelism,
KEEP_ALIVE_SECONDS, null);
int maxPoolSize = configurationParameters.get(CONFIG_DYNAMIC_MAX_POOL_SIZE_FACTOR_PROPERTY_NAME,
BigDecimal::new).map(maxPoolSizeFactor -> {
Preconditions.condition(maxPoolSizeFactor.compareTo(BigDecimal.ONE) >= 0,
() -> String.format(
"Factor '%s' specified via configuration parameter '%s' must be greater than or equal to 1",
factor, CONFIG_DYNAMIC_FACTOR_PROPERTY_NAME));
return maxPoolSizeFactor.multiply(BigDecimal.valueOf(parallelism)).intValue();
}).orElseGet(() -> 256 + parallelism);

boolean saturate = configurationParameters.get(CONFIG_DYNAMIC_SATURATE_PROPERTY_NAME,
Boolean::valueOf).orElse(true);

return new DefaultParallelExecutionConfiguration(parallelism, parallelism, maxPoolSize, parallelism,
KEEP_ALIVE_SECONDS, __ -> saturate);
}
},

Expand Down Expand Up @@ -154,12 +166,44 @@ public ParallelExecutionConfiguration createConfiguration(ConfigurationParameter
* Property name of the factor used to determine the desired parallelism for the
* {@link #DYNAMIC} configuration strategy.
*
* <p>Value must be a decimal number; defaults to {@code 1}.
* <p>Value must be a non-negative decimal number; defaults to {@code 1}.
*
* @see #DYNAMIC
*/
public static final String CONFIG_DYNAMIC_FACTOR_PROPERTY_NAME = "dynamic.factor";

/**
* Property name of the factor used to determine the maximum pool size of
* the underlying fork-join pool for the {@link #DYNAMIC} configuration
* strategy.
*
* <p>Value must be a decimal number equal and greater than or equal to
* {@code 1}. When not set the maximum pool size is calculated as
* {@code 256 + dynamic.factor * Runtime.getRuntime().availableProcessors()}
* instead.
*
* @since 1.10
* @see #DYNAMIC
*/
@API(status = EXPERIMENTAL, since = "1.10")
public static final String CONFIG_DYNAMIC_MAX_POOL_SIZE_FACTOR_PROPERTY_NAME = "dynamic.max-pool-size-factor";

/**
* Property name used to disable saturation of the underlying fork-join pool
* for the {@link #DYNAMIC} configuration strategy.
*
* <p>When set to {@code false} the underlying fork-join pool will reject
* additional tasks if all available workers are busy and the maximum
* pool-size would be exceeded.
* <p>Value must either {@code true} or {@code false}; defaults to {@code true}.
*
* @since 1.10
* @see #DYNAMIC
* @see #CONFIG_DYNAMIC_FACTOR_PROPERTY_NAME
*/
@API(status = EXPERIMENTAL, since = "1.10")
public static final String CONFIG_DYNAMIC_SATURATE_PROPERTY_NAME = "dynamic.saturate";

/**
* Property name used to specify the fully qualified class name of the
* {@link ParallelExecutionConfigurationStrategy} to be used by the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
class DefaultParallelExecutionConfigurationStrategyTests {

private ConfigurationParameters configParams = mock();
final ConfigurationParameters configParams = mock();

@BeforeEach
void setUp() {
Expand Down Expand Up @@ -78,7 +78,25 @@ void dynamicStrategyCreatesValidConfiguration() {
assertThat(configuration.getMinimumRunnable()).isEqualTo(availableProcessors * 2);
assertThat(configuration.getMaxPoolSize()).isEqualTo(256 + (availableProcessors * 2));
assertThat(configuration.getKeepAliveSeconds()).isEqualTo(30);
assertThat(configuration.getSaturatePredicate()).isNull();
assertThat(configuration.getSaturatePredicate().test(null)).isTrue();
}

@Test
void dynamicSaturateStrategyCreatesValidConfiguration() {
when(configParams.get("dynamic.factor")).thenReturn(Optional.of("2.0"));
when(configParams.get("dynamic.max-pool-size-factor")).thenReturn(Optional.of("3.0"));
when(configParams.get("dynamic.saturate")).thenReturn(Optional.of("false"));

ParallelExecutionConfigurationStrategy strategy = DefaultParallelExecutionConfigurationStrategy.DYNAMIC;
var configuration = strategy.createConfiguration(configParams);

var availableProcessors = Runtime.getRuntime().availableProcessors();
assertThat(configuration.getParallelism()).isEqualTo(availableProcessors * 2);
assertThat(configuration.getCorePoolSize()).isEqualTo(availableProcessors * 2);
assertThat(configuration.getMinimumRunnable()).isEqualTo(availableProcessors * 2);
assertThat(configuration.getMaxPoolSize()).isEqualTo(availableProcessors * 6);
assertThat(configuration.getKeepAliveSeconds()).isEqualTo(30);
assertThat(configuration.getSaturatePredicate().test(null)).isFalse();
}

@Test
Expand Down

0 comments on commit 64cd26b

Please sign in to comment.