diff --git a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java
index 5b348b7adab..a30d26a78f6 100644
--- a/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java
+++ b/api/src/test/java/io/grpc/LoadBalancerRegistryTest.java
@@ -40,7 +40,7 @@ public void getClassesViaHardcoded_classesPresent() throws Exception {
@Test
public void stockProviders() {
LoadBalancerRegistry defaultRegistry = LoadBalancerRegistry.getDefaultRegistry();
- assertThat(defaultRegistry.providers()).hasSize(3);
+ assertThat(defaultRegistry.providers()).hasSize(4);
LoadBalancerProvider pickFirst = defaultRegistry.getProvider("pick_first");
assertThat(pickFirst).isInstanceOf(PickFirstLoadBalancerProvider.class);
@@ -56,6 +56,11 @@ public void stockProviders() {
assertThat(outlierDetection.getClass().getName()).isEqualTo(
"io.grpc.util.OutlierDetectionLoadBalancerProvider");
assertThat(roundRobin.getPriority()).isEqualTo(5);
+
+ LoadBalancerProvider randomSubsetting = defaultRegistry.getProvider("random_subsetting");
+ assertThat(randomSubsetting.getClass().getName()).isEqualTo(
+ "io.grpc.util.RandomSubsettingLoadBalancerProvider");
+ assertThat(randomSubsetting.getPriority()).isEqualTo(5);
}
@Test
diff --git a/util/build.gradle b/util/build.gradle
index 6fbd6925c00..846b110b106 100644
--- a/util/build.gradle
+++ b/util/build.gradle
@@ -58,6 +58,7 @@ animalsniffer {
tasks.named("javadoc").configure {
exclude 'io/grpc/util/MultiChildLoadBalancer.java'
exclude 'io/grpc/util/OutlierDetectionLoadBalancer*'
+ exclude 'io/grpc/util/RandomSubsettingLoadBalancer*'
exclude 'io/grpc/util/RoundRobinLoadBalancer*'
}
diff --git a/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancer.java b/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancer.java
new file mode 100644
index 00000000000..748dabde80d
--- /dev/null
+++ b/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancer.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2025 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.Status;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Random;
+
+
+/**
+ * Wraps a child {@code LoadBalancer}, separating the total set of backends into smaller subsets for
+ * the child balancer to balance across.
+ *
+ *
This implements random subsetting gRFC:
+ * https://https://github.com/grpc/proposal/blob/master/A68-random-subsetting.md
+ */
+final class RandomSubsettingLoadBalancer extends LoadBalancer {
+ private final GracefulSwitchLoadBalancer switchLb;
+ private final HashFunction hashFunc;
+
+ public RandomSubsettingLoadBalancer(Helper helper) {
+ switchLb = new GracefulSwitchLoadBalancer(checkNotNull(helper, "helper"));
+ int seed = new Random().nextInt();
+ hashFunc = Hashing.murmur3_128(seed);
+ }
+
+ @Override
+ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
+ RandomSubsettingLoadBalancerConfig config =
+ (RandomSubsettingLoadBalancerConfig)
+ resolvedAddresses.getLoadBalancingPolicyConfig();
+
+ ResolvedAddresses subsetAddresses = filterEndpoints(resolvedAddresses, config.subsetSize);
+
+ return switchLb.acceptResolvedAddresses(
+ subsetAddresses.toBuilder()
+ .setLoadBalancingPolicyConfig(config.childConfig)
+ .build());
+ }
+
+ // implements the subsetting algorithm, as described in A68:
+ // https://github.com/grpc/proposal/pull/423
+ private ResolvedAddresses filterEndpoints(ResolvedAddresses resolvedAddresses, int subsetSize) {
+ if (subsetSize >= resolvedAddresses.getAddresses().size()) {
+ return resolvedAddresses;
+ }
+
+ ArrayList endpointWithHashList =
+ new ArrayList<>(resolvedAddresses.getAddresses().size());
+
+ for (EquivalentAddressGroup addressGroup : resolvedAddresses.getAddresses()) {
+ endpointWithHashList.add(
+ new EndpointWithHash(
+ addressGroup,
+ hashFunc.hashString(
+ addressGroup.getAddresses().get(0).toString(),
+ StandardCharsets.UTF_8)));
+ }
+
+ Collections.sort(endpointWithHashList, new HashAddressComparator());
+
+ ArrayList addressGroups = new ArrayList<>(subsetSize);
+
+ for (int idx = 0; idx < subsetSize; ++idx) {
+ addressGroups.add(endpointWithHashList.get(idx).addressGroup);
+ }
+
+ return resolvedAddresses.toBuilder().setAddresses(addressGroups).build();
+ }
+
+ @Override
+ public void handleNameResolutionError(Status error) {
+ switchLb.handleNameResolutionError(error);
+ }
+
+ @Override
+ public void shutdown() {
+ switchLb.shutdown();
+ }
+
+ private static final class EndpointWithHash {
+ public final EquivalentAddressGroup addressGroup;
+ public final HashCode hashCode;
+
+ public EndpointWithHash(EquivalentAddressGroup addressGroup, HashCode hashCode) {
+ this.addressGroup = addressGroup;
+ this.hashCode = hashCode;
+ }
+ }
+
+ private static final class HashAddressComparator implements Comparator {
+ @Override
+ public int compare(EndpointWithHash lhs, EndpointWithHash rhs) {
+ return Long.compare(lhs.hashCode.asLong(), rhs.hashCode.asLong());
+ }
+ }
+
+ public static final class RandomSubsettingLoadBalancerConfig {
+ public final int subsetSize;
+ public final Object childConfig;
+
+ private RandomSubsettingLoadBalancerConfig(int subsetSize, Object childConfig) {
+ this.subsetSize = subsetSize;
+ this.childConfig = childConfig;
+ }
+
+ public static class Builder {
+ int subsetSize;
+ Object childConfig;
+
+ public Builder setSubsetSize(long subsetSize) {
+ checkArgument(subsetSize > 0L, "Subset size must be greater than 0");
+ // clamping subset size to Integer.MAX_VALUE due to collection indexing limitations in JVM
+ long subsetSizeClamped = Math.min(subsetSize, (long) Integer.MAX_VALUE);
+ // safe narrowing cast due to clamping
+ this.subsetSize = (int) subsetSizeClamped;
+ return this;
+ }
+
+ public Builder setChildConfig(Object childConfig) {
+ this.childConfig = checkNotNull(childConfig, "childConfig");
+ return this;
+ }
+
+ public RandomSubsettingLoadBalancerConfig build() {
+ return new RandomSubsettingLoadBalancerConfig(
+ subsetSize,
+ checkNotNull(childConfig, "childConfig"));
+ }
+ }
+ }
+}
diff --git a/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancerProvider.java b/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancerProvider.java
new file mode 100644
index 00000000000..54680823803
--- /dev/null
+++ b/util/src/main/java/io/grpc/util/RandomSubsettingLoadBalancerProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2025 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import io.grpc.Internal;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.NameResolver.ConfigOrError;
+import io.grpc.Status;
+import io.grpc.internal.JsonUtil;
+import java.util.Map;
+
+@Internal
+public final class RandomSubsettingLoadBalancerProvider extends LoadBalancerProvider {
+ private static final String POLICY_NAME = "random_subsetting";
+
+ @Override
+ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
+ return new RandomSubsettingLoadBalancer(helper);
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return true;
+ }
+
+ @Override
+ public int getPriority() {
+ return 5;
+ }
+
+ @Override
+ public String getPolicyName() {
+ return POLICY_NAME;
+ }
+
+ @Override
+ public ConfigOrError parseLoadBalancingPolicyConfig(Map rawConfig) {
+ try {
+ return parseLoadBalancingPolicyConfigInternal(rawConfig);
+ } catch (RuntimeException e) {
+ return ConfigOrError.fromError(
+ Status.UNAVAILABLE
+ .withCause(e)
+ .withDescription("Failed parsing configuration for " + getPolicyName()));
+ }
+ }
+
+ private ConfigOrError parseLoadBalancingPolicyConfigInternal(Map rawConfig) {
+ Long subsetSize = JsonUtil.getNumberAsLong(rawConfig, "subsetSize");
+ if (subsetSize == null) {
+ return ConfigOrError.fromError(
+ Status.INTERNAL.withDescription(
+ "Subset size missing in " + getPolicyName() + ", LB policy config=" + rawConfig));
+ }
+
+ ConfigOrError childConfig = GracefulSwitchLoadBalancer.parseLoadBalancingPolicyConfig(
+ JsonUtil.getListOfObjects(rawConfig, "childPolicy"));
+ if (childConfig.getError() != null) {
+ return ConfigOrError.fromError(Status.INTERNAL
+ .withDescription(
+ "Failed to parse child in " + getPolicyName() + ", LB policy config=" + rawConfig)
+ .withCause(childConfig.getError().asRuntimeException()));
+ }
+
+ return ConfigOrError.fromConfig(
+ new RandomSubsettingLoadBalancer.RandomSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(subsetSize)
+ .setChildConfig(childConfig.getConfig())
+ .build());
+ }
+}
diff --git a/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider b/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
index 1fdd69cb00b..d973a6f6728 100644
--- a/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
+++ b/util/src/main/resources/META-INF/services/io.grpc.LoadBalancerProvider
@@ -1,2 +1,3 @@
io.grpc.util.SecretRoundRobinLoadBalancerProvider$Provider
io.grpc.util.OutlierDetectionLoadBalancerProvider
+io.grpc.util.RandomSubsettingLoadBalancerProvider
diff --git a/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerProviderTest.java b/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerProviderTest.java
new file mode 100644
index 00000000000..830ad9723d8
--- /dev/null
+++ b/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerProviderTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2025 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import io.grpc.InternalServiceProviders;
+import io.grpc.LoadBalancer.Helper;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.NameResolver.ConfigOrError;
+import io.grpc.Status;
+import io.grpc.internal.JsonParser;
+import io.grpc.util.RandomSubsettingLoadBalancer.RandomSubsettingLoadBalancerConfig;
+import java.io.IOException;
+import java.util.Map;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RandomSubsettingLoadBalancerProviderTest {
+ private final RandomSubsettingLoadBalancerProvider provider =
+ new RandomSubsettingLoadBalancerProvider();
+
+ @Test
+ public void registered() {
+ for (LoadBalancerProvider current :
+ InternalServiceProviders.getCandidatesViaServiceLoader(
+ LoadBalancerProvider.class, getClass().getClassLoader())) {
+ if (current instanceof RandomSubsettingLoadBalancerProvider) {
+ return;
+ }
+ }
+ fail("RandomSubsettingLoadBalancerProvider not registered");
+ }
+
+ @Test
+ public void providesLoadBalancer() {
+ Helper helper = mock(Helper.class);
+ assertThat(provider.newLoadBalancer(helper))
+ .isInstanceOf(RandomSubsettingLoadBalancer.class);
+ }
+
+ @Test
+ public void parseConfigRequiresSubsetSize() throws IOException {
+ String emptyConfig = "{}";
+
+ ConfigOrError configOrError =
+ provider.parseLoadBalancingPolicyConfig(parseJsonObject(emptyConfig));
+ assertThat(configOrError.getError()).isNotNull();
+ assertThat(configOrError.getError().toString())
+ .isEqualTo(
+ Status.INTERNAL
+ .withDescription("Subset size missing in random_subsetting, LB policy config={}")
+ .toString());
+ }
+
+ @Test
+ public void parseConfigReturnsErrorWhenChildPolicyMissing() throws IOException {
+ String missingChildPolicyConfig = "{\"subsetSize\": 3}";
+
+ ConfigOrError configOrError =
+ provider.parseLoadBalancingPolicyConfig(parseJsonObject(missingChildPolicyConfig));
+ assertThat(configOrError.getError()).isNotNull();
+
+ Status error = configOrError.getError();
+ assertThat(error.getCode()).isEqualTo(Status.Code.INTERNAL);
+ assertThat(error.getDescription()).isEqualTo(
+ "Failed to parse child in random_subsetting"
+ + ", LB policy config={subsetSize=3.0}");
+ assertThat(error.getCause().getMessage()).isEqualTo("INTERNAL: No child LB config specified");
+ }
+
+ @Test
+ public void parseConfigReturnsErrorWhenChildPolicyInvalid() throws IOException {
+ String invalidChildPolicyConfig =
+ "{"
+ + "\"subsetSize\": 3, "
+ + "\"childPolicy\" : [{\"random_policy\" : {}}]"
+ + "}";
+
+ ConfigOrError configOrError =
+ provider.parseLoadBalancingPolicyConfig(parseJsonObject(invalidChildPolicyConfig));
+ assertThat(configOrError.getError()).isNotNull();
+
+ Status error = configOrError.getError();
+ assertThat(error.getCode()).isEqualTo(Status.Code.INTERNAL);
+ assertThat(error.getDescription()).isEqualTo(
+ "Failed to parse child in random_subsetting, LB policy config="
+ + "{subsetSize=3.0, childPolicy=[{random_policy={}}]}");
+ assertThat(error.getCause().getMessage()).contains(
+ "INTERNAL: None of [random_policy] specified by Service Config are available.");
+ }
+
+ @Test
+ public void parseValidConfig() throws IOException {
+ String validConfig =
+ "{"
+ + "\"subsetSize\": 3, "
+ + "\"childPolicy\" : [{\"round_robin\" : {}}]"
+ + "}";
+ ConfigOrError configOrError =
+ provider.parseLoadBalancingPolicyConfig(parseJsonObject(validConfig));
+ assertThat(configOrError.getConfig()).isNotNull();
+
+ RandomSubsettingLoadBalancerConfig actualConfig =
+ (RandomSubsettingLoadBalancerConfig) configOrError.getConfig();
+ assertThat(GracefulSwitchLoadBalancerAccessor.getChildProvider(
+ actualConfig.childConfig).getPolicyName()).isEqualTo("round_robin");
+ assertThat(actualConfig.subsetSize).isEqualTo(3);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map parseJsonObject(String json) throws IOException {
+ return (Map) JsonParser.parse(json);
+ }
+}
diff --git a/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerTest.java
new file mode 100644
index 00000000000..7e889379e0f
--- /dev/null
+++ b/util/src/test/java/io/grpc/util/RandomSubsettingLoadBalancerTest.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2025 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.util;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import io.grpc.ConnectivityState;
+import io.grpc.ConnectivityStateInfo;
+import io.grpc.EquivalentAddressGroup;
+import io.grpc.LoadBalancer;
+import io.grpc.LoadBalancer.CreateSubchannelArgs;
+import io.grpc.LoadBalancer.ResolvedAddresses;
+import io.grpc.LoadBalancer.Subchannel;
+import io.grpc.LoadBalancer.SubchannelStateListener;
+import io.grpc.LoadBalancerProvider;
+import io.grpc.Status;
+import io.grpc.internal.TestUtils;
+import io.grpc.util.RandomSubsettingLoadBalancer.RandomSubsettingLoadBalancerConfig;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
+
+public class RandomSubsettingLoadBalancerTest {
+ @Rule
+ public final MockitoRule mockitoRule = MockitoJUnit.rule();
+
+ @Mock
+ private LoadBalancer.Helper mockHelper;
+ @Mock
+ private LoadBalancer mockChildLb;
+ @Mock
+ private SocketAddress mockSocketAddress;
+
+ @Captor
+ private ArgumentCaptor resolvedAddrCaptor;
+
+ private BackendDetails backendDetails;
+
+ private RandomSubsettingLoadBalancer loadBalancer;
+
+ private final LoadBalancerProvider mockChildLbProvider =
+ new TestUtils.StandardLoadBalancerProvider("foo_policy") {
+ @Override
+ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
+ return mockChildLb;
+ }
+ };
+
+ private final LoadBalancerProvider roundRobinLbProvider =
+ new TestUtils.StandardLoadBalancerProvider("round_robin") {
+ @Override
+ public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
+ return new RoundRobinLoadBalancer(helper);
+ }
+ };
+
+ private Object newChildConfig(LoadBalancerProvider provider, Object config) {
+ return GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(provider, config);
+ }
+
+ private RandomSubsettingLoadBalancerConfig createRandomSubsettingLbConfig(
+ int subsetSize, LoadBalancerProvider childLbProvider, Object childConfig) {
+ return new RandomSubsettingLoadBalancer.RandomSubsettingLoadBalancerConfig.Builder()
+ .setSubsetSize(subsetSize)
+ .setChildConfig(newChildConfig(childLbProvider, childConfig))
+ .build();
+ }
+
+ private BackendDetails setupBackends(int backendCount) {
+ List servers = Lists.newArrayList();
+ Map, Subchannel> subchannels = Maps.newLinkedHashMap();
+
+ for (int i = 0; i < backendCount; i++) {
+ SocketAddress addr = new FakeSocketAddress("server" + i);
+ EquivalentAddressGroup addressGroup = new EquivalentAddressGroup(addr);
+ servers.add(addressGroup);
+ Subchannel subchannel = mock(Subchannel.class);
+ subchannels.put(Arrays.asList(addressGroup), subchannel);
+ }
+
+ return new BackendDetails(servers, subchannels);
+ }
+
+ @Before
+ public void setUp() {
+ loadBalancer = new RandomSubsettingLoadBalancer(mockHelper);
+
+ int backendSize = 5;
+ backendDetails = setupBackends(backendSize);
+ }
+
+ @Test
+ public void handleNameResolutionError() {
+ int subsetSize = 2;
+ Object childConfig = "someConfig";
+
+ RandomSubsettingLoadBalancerConfig config = createRandomSubsettingLbConfig(
+ subsetSize, mockChildLbProvider, childConfig);
+
+ loadBalancer.acceptResolvedAddresses(
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress)))
+ .setLoadBalancingPolicyConfig(config)
+ .build());
+
+ loadBalancer.handleNameResolutionError(Status.DEADLINE_EXCEEDED);
+ verify(mockChildLb).handleNameResolutionError(Status.DEADLINE_EXCEEDED);
+ }
+
+ @Test
+ public void shutdown() {
+ int subsetSize = 2;
+ Object childConfig = "someConfig";
+
+ RandomSubsettingLoadBalancerConfig config = createRandomSubsettingLbConfig(
+ subsetSize, mockChildLbProvider, childConfig);
+
+ loadBalancer.acceptResolvedAddresses(
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.of(new EquivalentAddressGroup(mockSocketAddress)))
+ .setLoadBalancingPolicyConfig(config)
+ .build());
+
+ loadBalancer.shutdown();
+ verify(mockChildLb).shutdown();
+ }
+
+ @Test
+ public void acceptResolvedAddresses_mockedChildLbPolicy() {
+ int subsetSize = 3;
+ Object childConfig = "someConfig";
+
+ RandomSubsettingLoadBalancerConfig config = createRandomSubsettingLbConfig(
+ subsetSize, mockChildLbProvider, childConfig);
+
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.copyOf(backendDetails.servers))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+
+ loadBalancer.acceptResolvedAddresses(resolvedAddresses);
+
+ verify(mockChildLb).acceptResolvedAddresses(resolvedAddrCaptor.capture());
+ assertThat(resolvedAddrCaptor.getValue().getAddresses().size()).isEqualTo(subsetSize);
+ assertThat(resolvedAddrCaptor.getValue().getLoadBalancingPolicyConfig()).isEqualTo(childConfig);
+ }
+
+ @Test
+ public void acceptResolvedAddresses_roundRobinChildLbPolicy() {
+ int subsetSize = 3;
+ Object childConfig = null;
+
+ RandomSubsettingLoadBalancerConfig config = createRandomSubsettingLbConfig(
+ subsetSize, roundRobinLbProvider, childConfig);
+
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.copyOf(backendDetails.servers))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+
+ loadBalancer.acceptResolvedAddresses(resolvedAddresses);
+
+ int insubset = 0;
+ for (Subchannel subchannel : backendDetails.subchannels.values()) {
+ LoadBalancer.SubchannelStateListener ssl =
+ backendDetails.subchannelStateListeners.get(subchannel);
+ if (ssl != null) { // it might be null if it's not in the subset.
+ insubset += 1;
+ ssl.onSubchannelState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
+ }
+ }
+
+ assertThat(insubset).isEqualTo(subsetSize);
+ }
+
+ // verifies: https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting100-100-5.png
+ @Test
+ public void backendsCanBeDistributedEvenly_subsetting100_100_5() {
+ verifyConnectionsByServer(100, 100, 5, 15);
+ }
+
+ // verifies https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting100-100-25.png
+ @Test
+ public void backendsCanBeDistributedEvenly_subsetting100_100_25() {
+ verifyConnectionsByServer(100, 100, 25, 40);
+ }
+
+ // verifies: https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting100-10-5.png
+ @Test
+ public void backendsCanBeDistributedEvenly_subsetting100_10_5() {
+ verifyConnectionsByServer(100, 10, 5, 70);
+ }
+
+ // verifies: https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting500-10-5.png
+ @Test
+ public void backendsCanBeDistributedEvenly_subsetting500_10_5() {
+ verifyConnectionsByServer(500, 10, 5, 600);
+ }
+
+ // verifies: https://github.com/grpc/proposal/blob/master/A68_graphics/subsetting2000-10-5.png
+ @Test
+ public void backendsCanBeDistributedEvenly_subsetting2000_100_5() {
+ verifyConnectionsByServer(2000, 10, 5, 1200);
+ }
+
+ public void verifyConnectionsByServer(
+ int clientsCount, int serversCount, int subsetSize, int expectedMaxConnections) {
+ backendDetails = setupBackends(serversCount);
+ Object childConfig = "someConfig";
+
+ List configs = Lists.newArrayList();
+ for (int i = 0; i < clientsCount; i++) {
+ configs.add(createRandomSubsettingLbConfig(subsetSize, mockChildLbProvider, childConfig));
+ }
+
+ Map connectionsByServer = Maps.newLinkedHashMap();
+
+ for (RandomSubsettingLoadBalancerConfig config : configs) {
+ ResolvedAddresses resolvedAddresses =
+ ResolvedAddresses.newBuilder()
+ .setAddresses(ImmutableList.copyOf(backendDetails.servers))
+ .setLoadBalancingPolicyConfig(config)
+ .build();
+
+ loadBalancer = new RandomSubsettingLoadBalancer(mockHelper);
+ loadBalancer.acceptResolvedAddresses(resolvedAddresses);
+
+ verify(mockChildLb, atLeastOnce()).acceptResolvedAddresses(resolvedAddrCaptor.capture());
+ // Verify ChildLB is only getting subsetSize ResolvedAddresses each time
+ assertThat(resolvedAddrCaptor.getValue().getAddresses().size()).isEqualTo(config.subsetSize);
+
+ for (EquivalentAddressGroup eag : resolvedAddrCaptor.getValue().getAddresses()) {
+ for (SocketAddress addr : eag.getAddresses()) {
+ Integer prev = connectionsByServer.getOrDefault(addr, 0);
+ connectionsByServer.put(addr, prev + 1);
+ }
+ }
+ }
+
+ int maxConnections = Collections.max(connectionsByServer.values());
+
+ assertThat(maxConnections).isAtMost(expectedMaxConnections);
+ }
+
+ private class BackendDetails {
+ private final List servers;
+ private final Map, Subchannel> subchannels;
+ private final Map subchannelStateListeners;
+
+ BackendDetails(List servers,
+ Map, Subchannel> subchannels) {
+ this.servers = servers;
+ this.subchannels = subchannels;
+ this.subchannelStateListeners = Maps.newLinkedHashMap();
+
+ when(mockHelper.createSubchannel(any(LoadBalancer.CreateSubchannelArgs.class))).then(
+ new Answer() {
+ @Override
+ public Subchannel answer(InvocationOnMock invocation) throws Throwable {
+ CreateSubchannelArgs args = (CreateSubchannelArgs) invocation.getArguments()[0];
+ final Subchannel subchannel = backendDetails.subchannels.get(args.getAddresses());
+ when(subchannel.getAllAddresses()).thenReturn(args.getAddresses());
+ when(subchannel.getAttributes()).thenReturn(args.getAttributes());
+ doAnswer(new Answer() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ subchannelStateListeners.put(subchannel,
+ (SubchannelStateListener) invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(subchannel).start(any(SubchannelStateListener.class));
+ return subchannel;
+ }
+ });
+ }
+ }
+
+ private static class FakeSocketAddress extends SocketAddress {
+ final String name;
+
+ FakeSocketAddress(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return "FakeSocketAddress-" + name;
+ }
+ }
+}
diff --git a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java
index e08ea0fab43..c934eb5843e 100644
--- a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java
+++ b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java
@@ -32,6 +32,7 @@
import io.envoyproxy.envoy.extensions.load_balancing_policies.client_side_weighted_round_robin.v3.ClientSideWeightedRoundRobin;
import io.envoyproxy.envoy.extensions.load_balancing_policies.least_request.v3.LeastRequest;
import io.envoyproxy.envoy.extensions.load_balancing_policies.pick_first.v3.PickFirst;
+import io.envoyproxy.envoy.extensions.load_balancing_policies.random_subsetting.v3.RandomSubsetting;
import io.envoyproxy.envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash;
import io.envoyproxy.envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin;
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
@@ -92,6 +93,9 @@ class LoadBalancerConfigFactory {
static final String ERROR_UTILIZATION_PENALTY = "errorUtilizationPenalty";
+ static final String RANDOM_SUBSETTING_FIELD_NAME = "random_subsetting";
+ static final String SUBSET_SIZE = "subsetSize";
+
/**
* Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link
* Cluster}.
@@ -200,6 +204,20 @@ class LoadBalancerConfigFactory {
return ImmutableMap.of(PICK_FIRST_FIELD_NAME, configBuilder.buildOrThrow());
}
+ /**
+ * Builds a service config JSON object for the random_subsetting load balancer config based on the
+ * given config values.
+ */
+ private static ImmutableMap buildRandomSubsettingConfig(
+ RandomSubsetting randomSubsetting) {
+ return ImmutableMap.of(
+ RANDOM_SUBSETTING_FIELD_NAME,
+ ImmutableMap.of(
+ SUBSET_SIZE, randomSubsetting.getSubsetSize(),
+ CHILD_POLICY_FIELD, randomSubsetting.getChildPolicy()
+ ));
+ }
+
/**
* Responsible for converting from a {@code envoy.config.cluster.v3.LoadBalancingPolicy} proto
* message to a gRPC service config format.
@@ -236,6 +254,9 @@ static class LoadBalancingPolicyConverter {
typedConfig.unpack(ClientSideWeightedRoundRobin.class));
} else if (typedConfig.is(PickFirst.class)) {
serviceConfig = convertPickFirstConfig(typedConfig.unpack(PickFirst.class));
+ } else if (typedConfig.is(RandomSubsetting.class)) {
+ serviceConfig = convertRandomSubsettingConfig(
+ typedConfig.unpack(RandomSubsetting.class));
} else if (typedConfig.is(com.github.xds.type.v3.TypedStruct.class)) {
serviceConfig = convertCustomConfig(
typedConfig.unpack(com.github.xds.type.v3.TypedStruct.class));
@@ -324,6 +345,14 @@ static class LoadBalancingPolicyConverter {
return buildPickFirstConfig(pickFirst.getShuffleAddressList());
}
+ /**
+ * "Converts" a random_subsetting configuration to service config format.
+ */
+ private static ImmutableMap convertRandomSubsettingConfig(
+ RandomSubsetting randomSubsetting) {
+ return buildRandomSubsettingConfig(randomSubsetting);
+ }
+
/**
* Converts a least_request {@link Any} configuration to service config format.
*/
diff --git a/xds/third_party/envoy/src/main/proto/envoy/extensions/load_balancing_policies/random_subsetting/v3/random_subsetting.proto b/xds/third_party/envoy/src/main/proto/envoy/extensions/load_balancing_policies/random_subsetting/v3/random_subsetting.proto
new file mode 100644
index 00000000000..0690397f770
--- /dev/null
+++ b/xds/third_party/envoy/src/main/proto/envoy/extensions/load_balancing_policies/random_subsetting/v3/random_subsetting.proto
@@ -0,0 +1,33 @@
+syntax = "proto3";
+
+package envoy.extensions.load_balancing_policies.random_subsetting.v3;
+
+import "envoy/config/cluster/v3/cluster.proto";
+
+import "google/protobuf/wrappers.proto";
+
+import "udpa/annotations/status.proto";
+import "validate/validate.proto";
+
+option java_package = "io.envoyproxy.envoy.extensions.load_balancing_policies.random_subsetting.v3";
+option java_outer_classname = "RandomSubsettingProto";
+option java_multiple_files = true;
+option go_package = "github.com/envoyproxy/go-control-plane/envoy/extensions/load_balancing_policies/random_subsetting/v3;random_subsettingv3";
+option (udpa.annotations.file_status).package_version_status = ACTIVE;
+
+// [#protodoc-title: Random Subsetting Load Balancing Policy]
+// [#extension: envoy.load_balancing_policies.random_subsetting]
+
+message RandomSubsetting {
+ // subset_size indicates how many backends every client will be connected to.
+ // The value must be greater than 0.
+ google.protobuf.UInt32Value subset_size = 1 [
+ (validate.rules).uint32 = {gt: 0}
+ ];
+
+ // The config for the child policy.
+ // The value is required.
+ config.cluster.v3.LoadBalancingPolicy child_policy = 2 [
+ (validate.rules).message = {required: true}
+ ];
+}