Skip to content

Commit

Permalink
loadbalancer-experimental: Rename ConnectionPool* types to Connection…
Browse files Browse the repository at this point in the history
…Selector* (#3145)

Motivation:

A ConnectionPoolPolicy doesn't actually control pooling, but selection
from that pool. The name is also inconsistent with the thing it generates:
a ConnectionSelector.

Modifications:

Rename the types to ConnectionSelector* for more consistency.
  • Loading branch information
bryce-anderson authored Dec 19, 2024
1 parent 5fb80d0 commit 79b24c2
Show file tree
Hide file tree
Showing 13 changed files with 60 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
import io.servicetalk.client.api.LoadBalancedConnection;

/**
* A factory to create different {@link ConnectionPoolPolicy} variants.
* A factory to create different {@link ConnectionSelectorPolicy} variants.
*/
public final class ConnectionPoolPolicies {
public final class ConnectionSelectorPolicies {
private static final int DEFAULT_MAX_EFFORT = 5;
private static final int DEFAULT_LINEAR_SEARCH_SPACE = 16;

private ConnectionPoolPolicies() {
private ConnectionSelectorPolicies() {
// no instances
}

/**
* A connection selection policy that prioritizes a configurable "core" pool.
* <p>
* This {@link ConnectionPoolPolicy} attempts to emulate the pooling behavior often seen in thread pools.
* This {@link ConnectionSelectorPolicy} attempts to emulate the pooling behavior often seen in thread pools.
* Specifically it allows for the configuration of a "core pool" size which are intended to be long-lived.
* Iteration starts in the core pool at a random position and then iterates through the entire core pool before
* moving to an overflow pool. Because iteration of this core pool starts at a random position the core connections
Expand All @@ -46,47 +46,48 @@ private ConnectionPoolPolicies() {
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
* @return the configured {@link ConnectionSelectorPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> corePool(final int corePoolSize,
final boolean forceCorePool) {
public static <C extends LoadBalancedConnection> ConnectionSelectorPolicy<C> corePool(final int corePoolSize,
final boolean forceCorePool) {
return CorePoolConnectionSelector.factory(corePoolSize, forceCorePool);
}

/**
* A connection selection policy that prioritizes connection reuse.
* <p>
* This {@link ConnectionPoolPolicy} attempts to minimize the number of connections by attempting to direct
* This {@link ConnectionSelectorPolicy} attempts to minimize the number of connections by attempting to direct
* traffic to connections in the order they were created in linear order up until a configured quantity. After
* this linear pool is exhausted the remaining connections will be selected from at random. Prioritizing traffic
* to the existing connections will let tailing connections be removed due to idleness.
*
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
* @return the configured {@link ConnectionSelectorPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> linearSearch() {
public static <C extends LoadBalancedConnection> ConnectionSelectorPolicy<C> linearSearch() {
return linearSearch(DEFAULT_LINEAR_SEARCH_SPACE);
}

/**
* A connection selection policy that prioritizes connection reuse.
* <p>
* This {@link ConnectionPoolPolicy} attempts to minimize the number of connections by attempting to direct
* This {@link ConnectionSelectorPolicy} attempts to minimize the number of connections by attempting to direct
* traffic to connections in the order they were created in linear order up until a configured quantity. After
* this linear pool is exhausted the remaining connections will be selected from at random. Prioritizing traffic
* to the existing connections will let tailing connections be removed due to idleness.
*
* @param linearSearchSpace the space to search linearly before resorting to random selection for remaining
* connections.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
* @return the configured {@link ConnectionSelectorPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> linearSearch(final int linearSearchSpace) {
public static <C extends LoadBalancedConnection>
ConnectionSelectorPolicy<C> linearSearch(final int linearSearchSpace) {
return LinearSearchConnectionSelector.factory(linearSearchSpace);
}

/**
* A {@link ConnectionPoolPolicy} that attempts to discern between the health of individual connections.
* A {@link ConnectionSelectorPolicy} that attempts to discern between the health of individual connections.
* If individual connections have health data the P2C policy can be used to bias traffic toward the best
* connections. This has the following algorithm:
* <ol>
Expand All @@ -104,15 +105,15 @@ public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> linearS
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
* @return the configured {@link ConnectionSelectorPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> p2c(final int corePoolSize,
final boolean forceCorePool) {
public static <C extends LoadBalancedConnection> ConnectionSelectorPolicy<C> p2c(final int corePoolSize,
final boolean forceCorePool) {
return p2c(DEFAULT_MAX_EFFORT, corePoolSize, forceCorePool);
}

/**
* A {@link ConnectionPoolPolicy} that attempts to discern between the health of individual connections.
* A {@link ConnectionSelectorPolicy} that attempts to discern between the health of individual connections.
* If individual connections have health data the P2C policy can be used to bias traffic toward the best
* connections. This has the following algorithm:
* <ol>
Expand All @@ -131,11 +132,11 @@ public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> p2c(fin
* @param forceCorePool whether to avoid selecting connections from the core pool until it has reached the
* configured core pool size.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
* @return the configured {@link ConnectionPoolPolicy}.
* @return the configured {@link ConnectionSelectorPolicy}.
*/
public static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> p2c(final int maxEffort,
final int corePoolSize,
final boolean forceCorePool) {
public static <C extends LoadBalancedConnection> ConnectionSelectorPolicy<C> p2c(final int maxEffort,
final int corePoolSize,
final boolean forceCorePool) {
return P2CConnectionSelector.factory(maxEffort, corePoolSize, forceCorePool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
* Configuration of the policy for selecting connections from a pool to the same endpoint.
* @param <C> the concrete type of the {@link LoadBalancedConnection}
*/
public abstract class ConnectionPoolPolicy<C extends LoadBalancedConnection> {
public abstract class ConnectionSelectorPolicy<C extends LoadBalancedConnection> {

ConnectionPoolPolicy() {
ConnectionSelectorPolicy() {
// package private constructor to control proliferation
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ public C select(List<C> connections, Predicate<C> selector) {
return null;
}

static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> factory(
static <C extends LoadBalancedConnection> ConnectionSelectorPolicy<C> factory(
int corePoolSize, boolean forceCorePool) {
return new CorePoolConnectionSelectorFactory<>(corePoolSize, forceCorePool);
}

private static final class CorePoolConnectionSelectorFactory<C extends LoadBalancedConnection>
extends ConnectionPoolPolicy<C> {
extends ConnectionSelectorPolicy<C> {

private final int corePoolSize;
private final boolean forceCorePool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
* @param priorityStrategyFactory a builder of the {@link HostPriorityStrategy} to use with the load balancer.
* @param loadBalancingPolicy a factory of the initial host selector to use with this load balancer.
* @param subsetter a subset builder.
* @param connectionPoolPolicy factory of the connection pool strategy to use with this load balancer.
* @param connectionSelectorPolicy factory of the connection pool strategy to use with this load balancer.
* @param connectionFactory a function which creates new connections.
* @param loadBalancerObserverFactory factory used to build a {@link LoadBalancerObserver} to use with this
* load balancer.
Expand All @@ -144,7 +144,7 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
final Function<String, HostPriorityStrategy> priorityStrategyFactory,
final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
final Subsetter subsetter,
final ConnectionPoolPolicy<C> connectionPoolPolicy,
final ConnectionSelectorPolicy<C> connectionSelectorPolicy,
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final LoadBalancerObserverFactory loadBalancerObserverFactory,
@Nullable final HealthCheckConfig healthCheckConfig,
Expand All @@ -155,8 +155,8 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio
.buildSelector(Collections.emptyList(), lbDescription);
this.priorityStrategy = requireNonNull(
priorityStrategyFactory, "priorityStrategyFactory").apply(lbDescription);
this.connectionSelector = requireNonNull(connectionPoolPolicy,
"connectionPoolPolicy").buildConnectionSelector(lbDescription);
this.connectionSelector = requireNonNull(connectionSelectorPolicy,
"connectionSelectorPolicy").buildConnectionSelector(lbDescription);
this.eventPublisher = requireNonNull(eventPublisher);
this.eventStream = fromSource(eventStreamProcessor)
.replay(1); // Allow for multiple subscribers and provide new subscribers with last signal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ final class DefaultLoadBalancerBuilder<ResolvedAddress, C extends LoadBalancedCo
@Nullable
private LoadBalancerObserverFactory loadBalancerObserverFactory;
private LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy = defaultLoadBalancingPolicy();
private ConnectionPoolPolicy<C> connectionPoolPolicy = defaultConnectionSelectorFactory();
private ConnectionSelectorPolicy<C> connectionSelectorPolicy = defaultConnectionSelectorPolicy();
private OutlierDetectorConfig outlierDetectorConfig = OutlierDetectorConfig.DEFAULT_CONFIG;

// package private constructor so users must funnel through providers in `LoadBalancers`
Expand Down Expand Up @@ -69,9 +69,9 @@ public LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorConfig(OutlierDete
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> connectionPoolPolicy(
ConnectionPoolPolicy<C> connectionPoolPolicy) {
this.connectionPoolPolicy = requireNonNull(connectionPoolPolicy, "connectionPoolPolicy");
public LoadBalancerBuilder<ResolvedAddress, C> connectionSelectorPolicy(
ConnectionSelectorPolicy<C> connectionSelectorPolicy) {
this.connectionSelectorPolicy = requireNonNull(connectionSelectorPolicy, "connectionSelectorPolicy");
return this;
}

Expand All @@ -84,30 +84,30 @@ public LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backg
@Override
public LoadBalancerFactory<ResolvedAddress, C> build() {
return new DefaultLoadBalancerFactory<>(id, loadBalancingPolicy, loadBalancerObserverFactory,
connectionPoolPolicy, outlierDetectorConfig, getExecutor());
connectionSelectorPolicy, outlierDetectorConfig, getExecutor());
}

static final class DefaultLoadBalancerFactory<ResolvedAddress, C extends LoadBalancedConnection>
implements LoadBalancerFactory<ResolvedAddress, C> {

private final String id;
private final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy;
private final ConnectionPoolPolicy<C> connectionPoolPolicy;
private final ConnectionSelectorPolicy<C> connectionSelectorPolicy;
private final OutlierDetectorConfig outlierDetectorConfig;
@Nullable
private final LoadBalancerObserverFactory loadBalancerObserverFactory;
private final Executor executor;

DefaultLoadBalancerFactory(final String id, final LoadBalancingPolicy<ResolvedAddress, C> loadBalancingPolicy,
@Nullable final LoadBalancerObserverFactory loadBalancerObserverFactory,
final ConnectionPoolPolicy<C> connectionPoolPolicy,
final ConnectionSelectorPolicy<C> connectionSelectorPolicy,
final OutlierDetectorConfig outlierDetectorConfig,
final Executor executor) {
this.id = requireNonNull(id, "id");
this.loadBalancingPolicy = requireNonNull(loadBalancingPolicy, "loadBalancingPolicy");
this.loadBalancerObserverFactory = loadBalancerObserverFactory;
this.outlierDetectorConfig = requireNonNull(outlierDetectorConfig, "outlierDetectorConfig");
this.connectionPoolPolicy = requireNonNull(connectionPoolPolicy, "connectionPoolPolicy");
this.connectionSelectorPolicy = requireNonNull(connectionSelectorPolicy, "connectionSelectorPolicy");
this.executor = requireNonNull(executor, "executor");
}

Expand Down Expand Up @@ -154,7 +154,7 @@ public LoadBalancer<C> newLoadBalancer(
}
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
DefaultHostPriorityStrategy::new, loadBalancingPolicy, new RandomSubsetter(Integer.MAX_VALUE),
connectionPoolPolicy, connectionFactory,
connectionSelectorPolicy, connectionFactory,
loadBalancerObserverFactory, healthCheckConfig, outlierDetectorFactory);
}

Expand All @@ -169,7 +169,7 @@ public String toString() {
return "DefaultLoadBalancerFactory{" +
"id='" + id + '\'' +
", loadBalancingPolicy=" + loadBalancingPolicy +
", connectionPoolPolicy=" + connectionPoolPolicy +
", connectionSelectorPolicy=" + connectionSelectorPolicy +
", outlierDetectorConfig=" + outlierDetectorConfig +
", loadBalancerObserverFactory=" + loadBalancerObserverFactory +
", executor=" + executor +
Expand All @@ -187,8 +187,8 @@ LoadBalancingPolicy<ResolvedAddress, C> defaultLoadBalancingPolicy() {
return LoadBalancingPolicies.roundRobin().build();
}

private static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C>
defaultConnectionSelectorFactory() {
return ConnectionPoolPolicies.linearSearch();
private static <C extends LoadBalancedConnection>
ConnectionSelectorPolicy<C> defaultConnectionSelectorPolicy() {
return ConnectionSelectorPolicies.linearSearch();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public LoadBalancerBuilder<ResolvedAddress, C> backgroundExecutor(Executor backg
}

@Override
public LoadBalancerBuilder<ResolvedAddress, C> connectionPoolPolicy(
ConnectionPoolPolicy<C> connectionPoolPolicy) {
delegate = delegate.connectionPoolPolicy(connectionPoolPolicy);
public LoadBalancerBuilder<ResolvedAddress, C> connectionSelectorPolicy(
ConnectionSelectorPolicy<C> connectionSelectorPolicy) {
delegate = delegate.connectionSelectorPolicy(connectionSelectorPolicy);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ public C select(List<C> connections, Predicate<C> selector) {
return null;
}

static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> factory(final int linearSearchSpace) {
static <C extends LoadBalancedConnection> ConnectionSelectorPolicy<C> factory(final int linearSearchSpace) {
return new LinearSearchConnectionSelectorFactory<>(linearSearchSpace);
}

private static final class LinearSearchConnectionSelectorFactory<C extends LoadBalancedConnection>
extends ConnectionPoolPolicy<C> {
extends ConnectionSelectorPolicy<C> {

private final int linearSearchSpace;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ LoadBalancerBuilder<ResolvedAddress, C> loadBalancerObserver(
LoadBalancerBuilder<ResolvedAddress, C> outlierDetectorConfig(OutlierDetectorConfig outlierDetectorConfig);

/**
* Set the {@link ConnectionPoolPolicy} to use with this load balancer.
* Set the {@link ConnectionSelectorPolicy} to use with this load balancer.
*
* @param connectionPoolPolicy the factory of connection pooling strategies to use.
* @param connectionSelectorPolicy the factory of connection selection strategies to use.
* @return {@code this}
*/
LoadBalancerBuilder<ResolvedAddress, C> connectionPoolPolicy(ConnectionPoolPolicy<C> connectionPoolPolicy);
LoadBalancerBuilder<ResolvedAddress, C> connectionSelectorPolicy(
ConnectionSelectorPolicy<C> connectionSelectorPolicy);

/**
* Set the background {@link Executor} to use for determining time and scheduling background tasks such
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,13 @@ private C p2cPick(ThreadLocalRandom rnd, int randomSearchSpace, List<C> connecti
return null;
}

static <C extends LoadBalancedConnection> ConnectionPoolPolicy<C> factory(
static <C extends LoadBalancedConnection> ConnectionSelectorPolicy<C> factory(
final int maxEffort, final int corePoolSize, final boolean forceCorePool) {
return new P2CConnectionSelectorFactory<>(maxEffort, corePoolSize, forceCorePool);
}

private static final class P2CConnectionSelectorFactory<C extends LoadBalancedConnection>
extends ConnectionPoolPolicy<C> {
extends ConnectionSelectorPolicy<C> {

private final int maxEffort;
private final int corePoolSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public LoadBalancerFactory<ResolvedAddress, C> build() {
}
return builder.outlierDetectorConfig(outlierDetectorConfig)
.loadBalancingPolicy(loadBalancingPolicy)
.connectionPoolPolicy(ConnectionPoolPolicies.linearSearch(linearSearchSpace))
.connectionSelectorPolicy(ConnectionSelectorPolicies.linearSearch(linearSearchSpace))
.build();
}
}
Expand Down
Loading

0 comments on commit 79b24c2

Please sign in to comment.