Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow injecting load balancer in addition to creating with reflection #1641

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class DefaultClientChannelManager implements ClientChannelManager {

public static final String METRIC_PREFIX = "connectionpool";

private final Resolver <? extends DiscoveryResult> dynamicServerResolver;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DiscoveryResult is a final class

private final Resolver<DiscoveryResult> dynamicServerResolver;
private final ConnectionPoolConfig connPoolConfig;
private final IClientConfig clientConfig;
private final Registry spectatorRegistry;
Expand Down Expand Up @@ -100,40 +100,12 @@ public class DefaultClientChannelManager implements ClientChannelManager {

public DefaultClientChannelManager(
OriginName originName, IClientConfig clientConfig, Registry spectatorRegistry) {
this.originName = Objects.requireNonNull(originName, "originName");
this.dynamicServerResolver = new DynamicServerResolver(clientConfig, new ServerPoolListener());

String metricId = originName.getMetricId();

this.clientConfig = clientConfig;
this.spectatorRegistry = spectatorRegistry;
this.perServerPools = new ConcurrentHashMap<>(200);

this.connPoolConfig = new ConnectionPoolConfigImpl(originName, this.clientConfig);

this.createNewConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create", metricId);
this.createConnSucceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_success", metricId);
this.createConnFailedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_create_fail", metricId);

this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", metricId);
this.closeAbovePoolHighWaterMarkCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeAbovePoolHighWaterMark", metricId);
this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeExpiredConnLifetime", metricId);
this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", metricId);
this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", metricId);
this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", metricId);
this.alreadyClosedCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_alreadyClosed", metricId);
this.connTakenFromPoolIsNotOpen = SpectatorUtils.newCounter(METRIC_PREFIX + "_fromPoolIsClosed", metricId);
this.maxConnsPerHostExceededCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_maxConnsPerHostExceeded", metricId);
this.closeWrtBusyConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeWrtBusyConnCounter", metricId);
this.connEstablishTimer = PercentileTimer.get(spectatorRegistry, spectatorRegistry.createId(METRIC_PREFIX + "_createTiming", "id", metricId));
this.connsInPool = SpectatorUtils.newGauge(METRIC_PREFIX + "_inPool", metricId, new AtomicInteger());
this.connsInUse = SpectatorUtils.newGauge(METRIC_PREFIX + "_inUse", metricId, new AtomicInteger());
this(originName, clientConfig, new DynamicServerResolver(clientConfig), spectatorRegistry);
}

@VisibleForTesting
public DefaultClientChannelManager(
OriginName originName, IClientConfig clientConfig,
Resolver<? extends DiscoveryResult> resolver, Registry spectatorRegistry) {
Resolver<DiscoveryResult> resolver, Registry spectatorRegistry) {
this.originName = Objects.requireNonNull(originName, "originName");
this.dynamicServerResolver = resolver;

Expand All @@ -151,7 +123,7 @@ public DefaultClientChannelManager(

this.closeConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_close", metricId);
this.closeAbovePoolHighWaterMarkCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeAbovePoolHighWaterMark", metricId);
this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "__closeExpiredConnLifetime", metricId);
this.closeExpiredConnLifetimeCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_closeExpiredConnLifetime", metricId);
this.requestConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_request", metricId);
this.reuseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_reuse", metricId);
this.releaseConnCounter = SpectatorUtils.newCounter(METRIC_PREFIX + "_release", metricId);
Expand All @@ -167,6 +139,7 @@ public DefaultClientChannelManager(
@Override
public void init()
{
dynamicServerResolver.setListener(new ServerPoolListener());
// Load channel initializer and conn factory.
// We don't do this within the constructor because some subclass may not be initialized until post-construct.
this.channelInitializer = createChannelInitializer(clientConfig, connPoolConfig, spectatorRegistry);
Expand Down Expand Up @@ -413,7 +386,6 @@ protected IConnectionPool createConnectionPool(
}

final class ServerPoolListener implements ResolverListener<DiscoveryResult> {

@Override
public void onChange(List<DiscoveryResult> removedSet) {
if (!removedSet.isEmpty()) {
Expand All @@ -427,7 +399,6 @@ public void onChange(List<DiscoveryResult> removedSet) {
}
}
}

}

@Override
Expand Down Expand Up @@ -477,4 +448,5 @@ static SocketAddress pickAddressInternal(ResolverResult chosenServer, @Nullable
protected SocketAddress pickAddress(DiscoveryResult chosenServer) {
return pickAddressInternal(chosenServer, connPoolConfig.getOriginName());
}

}
2 changes: 1 addition & 1 deletion zuul-discovery/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ dependencies {
implementation libraries.guava
implementation libraries.slf4j

implementation "com.netflix.ribbon:ribbon-loadbalancer:${versions_ribbon}"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

had to change this because I added the constructor that takes a DynamicServerListLoadBalancer

api "com.netflix.ribbon:ribbon-loadbalancer:${versions_ribbon}"
implementation "com.netflix.ribbon:ribbon-core:${versions_ribbon}"
implementation "com.netflix.ribbon:ribbon-eureka:${versions_ribbon}"
implementation "com.netflix.ribbon:ribbon-archaius:${versions_ribbon}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import com.netflix.zuul.resolver.ResolverListener;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Argha C
Expand All @@ -40,15 +43,37 @@
*/
public class DynamicServerResolver implements Resolver<DiscoveryResult> {

private static final Logger LOG = LoggerFactory.getLogger(DynamicServerResolver.class);

private final DynamicServerListLoadBalancer<?> loadBalancer;
ResolverListener<DiscoveryResult> listener;
private ResolverListener<DiscoveryResult> listener;

@Deprecated
public DynamicServerResolver(IClientConfig clientConfig, ResolverListener<DiscoveryResult> listener) {
this.loadBalancer = createLoadBalancer(clientConfig);
this.loadBalancer.addServerListChangeListener(this::onUpdate);
this.listener = listener;
}

public DynamicServerResolver(IClientConfig clientConfig) {
this(createLoadBalancer(clientConfig));
}

public DynamicServerResolver(DynamicServerListLoadBalancer<?> loadBalancer) {
this.loadBalancer = Objects.requireNonNull(loadBalancer);
}

@Override
public void setListener(ResolverListener<DiscoveryResult> listener) {
if(this.listener != null) {
LOG.warn("Ignoring call to setListener, because a listener was already set");
return;
}

this.listener = Objects.requireNonNull(listener);
this.loadBalancer.addServerListChangeListener(this::onUpdate);
}

@Override
public DiscoveryResult resolve(@Nullable Object key) {
final Server server = loadBalancer.chooseServer(key);
Expand All @@ -65,7 +90,7 @@ public void shutdown() {
loadBalancer.shutdown();
}

private DynamicServerListLoadBalancer<?> createLoadBalancer(IClientConfig clientConfig) {
private static DynamicServerListLoadBalancer<?> createLoadBalancer(IClientConfig clientConfig) {
//TODO(argha-c): Revisit this style of LB initialization post modularization. Ideally the LB should be pluggable.

// Use a hard coded string for the LB default name to avoid a dependency on Ribbon classes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface Resolver<T> {
* hook to perform activities on shutdown
*/
void shutdown();

default void setListener(ResolverListener<T> listener) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public List<DiscoveryResult> updatedList() {
}

final CustomListener listener = new CustomListener();
final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl(), listener);
final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl());
resolver.setListener(listener);

final InstanceInfo first = Builder.newBuilder()
.setAppName("zuul-discovery-1")
Expand All @@ -73,11 +74,7 @@ public List<DiscoveryResult> updatedList() {

@Test
void properSentinelValueWhenServersUnavailable() {
final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl(), new ResolverListener<DiscoveryResult>() {
@Override
public void onChange(List<DiscoveryResult> removedSet) {
}
});
final DynamicServerResolver resolver = new DynamicServerResolver(new DefaultClientConfigImpl());

final DiscoveryResult nonExistentServer = resolver.resolve(null);

Expand Down