Skip to content

Handle exceptions and timeouts new #767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/asciidoc/_configprops.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
|spring.cloud.inetutils.use-only-site-local-interfaces | false | Whether to use only interfaces with site local addresses. See {@link InetAddress#isSiteLocalAddress()} for more details.
|spring.cloud.loadbalancer.cache.caffeine.spec | | The spec to use to create caches. See CaffeineSpec for more details on the spec format.
|spring.cloud.loadbalancer.cache.capacity | 256 | Initial cache capacity expressed as int.
|spring.cloud.loadbalancer.cache.enabled | true | Enables Spring Cloud LoadBalancer caching mechanism.
|spring.cloud.loadbalancer.cache.ttl | 35s | Time To Live - time counted from writing of the record, after which cache entries are expired, expressed as a {@link Duration}. The property {@link String} has to be in keeping with the appropriate syntax as specified in Spring Boot <code>StringToDurationConverter</code>. @see <a href= "https://github.com/spring-projects/spring-boot/blob/master/spring-boot-project/spring-boot/src/main/java/org/springframework/boot/convert/StringToDurationConverter.java">StringToDurationConverter.java</a>
|spring.cloud.loadbalancer.health-check.initial-delay | 0 | Initial delay value for the HealthCheck scheduler.
|spring.cloud.loadbalancer.health-check.interval | 25s | Interval for rerunning the HealthCheck scheduler.
|spring.cloud.loadbalancer.health-check.path | |
|spring.cloud.loadbalancer.retry.enabled | true |
|spring.cloud.loadbalancer.ribbon.enabled | true | Causes `RibbonLoadBalancerClient` to be used by default.
|spring.cloud.loadbalancer.service-discovery.timeout | | String representation of Duration of the timeout for calls to service discovery.
|spring.cloud.loadbalancer.zone | | Spring Cloud LoadBalancer zone.
|spring.cloud.refresh.enabled | true | Enables autoconfiguration for the refresh scope and associated features.
|spring.cloud.refresh.extra-refreshable | true | Additional class names for beans to post process into refresh scope.
Expand Down
14 changes: 10 additions & 4 deletions docs/src/main/asciidoc/spring-cloud-commons.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -938,16 +938,21 @@ TIP: This mechanism is particularly helpful while using the `SimpleDiscoveryClie
clients backed by an actual Service Registry, it's not necessary to use, as we already get
healthy instances after querying the external ServiceDiscovery.

TIP:: This supplier is also recommended for setups with a small number of instances per service
in order to avoid retrying calls on a failing instance.

`HealthCheckServiceInstanceListSupplier` uses properties prefixed with
`spring.cloud.loadbalancer.healthcheck`. You can set the `initialDelay` and `interval`
for the scheduler. You can set the default path for the healthcheck URL by setting
the value of the `spring.cloud.loadbalancer.healthcheck.path.default`. You can also set a specific value
for any given service by setting the value of the `spring.cloud.loadbalancer.healthcheck.path.[SERVICE_ID]`, substituting the `[SERVICE_ID]` with the correct ID of your service. If the path is not set, `/actuator/health` is used by default.

TIP:: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own.

In order to use the health-check scheduler approach, you will have to instantiate a `HealthCheckServiceInstanceListSupplier` bean in a <<custom-loadbalancer-configuration,custom configuration>>.

We use delegates to work with `ServiceInstanceListSupplier` beans.
We suggest passing a `DiscoveryClientServiceInstanceListSupplier` delegate in the constructor of `HealthCheckServiceInstanceListSupplier` and, in turn, wrapping the latter with a `CachingServiceInstanceListSupplier` to leverage <<loadbalancer-caching, LoadBalancer caching mechanism>>.
We suggest passing a `DiscoveryClientServiceInstanceListSupplier` delegate in the constructor of `HealthCheckServiceInstanceListSupplier`.

You could use this sample configuration to set it up:

Expand All @@ -969,9 +974,6 @@ public class CustomLoadBalancerConfiguration {

NOTE:: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`, therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`.

TIP:: In order to make working on your own LoadBalancer configuration easier, we have added a `builder()` method to the `ServiceInstanceListSupplier` class.

TIP:: You can also use our alternative predefined configurations in place of the default ones by setting the value of `spring.cloud.loadbalancer.configurations` property to `zone-preference` to use `ZonePreferenceServiceInstanceListSupplier` with caching or to `health-check` to use `HealthCheckServiceInstanceListSupplier` with caching.

[[spring-cloud-loadbalancer-starter]]
=== Spring Cloud LoadBalancer Starter
Expand Down Expand Up @@ -1005,6 +1007,10 @@ public class MyConfiguration {
}
}
----

TIP:: In order to make working on your own LoadBalancer configuration easier, we have added a `builder()` method to the `ServiceInstanceListSupplier` class.

TIP:: You can also use our alternative predefined configurations in place of the default ones by setting the value of `spring.cloud.loadbalancer.configurations` property to `zone-preference` to use `ZonePreferenceServiceInstanceListSupplier` with caching or to `health-check` to use `HealthCheckServiceInstanceListSupplier` with caching.
====

You can use this feature to instantiate different implementations of `ServiceInstanceListSupplier` or `ReactorLoadBalancer`, either written by you, or provided by us as alternatives (for example `ZonePreferenceServiceInstanceListSupplier`) to override the default setup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

package org.springframework.cloud.loadbalancer.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import org.springframework.boot.convert.DurationStyle;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
Expand All @@ -39,23 +44,47 @@
public class DiscoveryClientServiceInstanceListSupplier
implements ServiceInstanceListSupplier {

/**
* Property that establishes the timeout for calls to service discovery.
*/
public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";

private static final Log LOG = LogFactory
.getLog(DiscoveryClientServiceInstanceListSupplier.class);

private Duration timeout = Duration.ofSeconds(30);

private final String serviceId;

private final Flux<List<ServiceInstance>> serviceInstances;

public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate,
Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
resolveTimeout(environment);
this.serviceInstances = Flux
.defer(() -> Flux.just(delegate.getInstances(serviceId)))
.subscribeOn(Schedulers.boundedElastic());
.timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
}).subscribeOn(Schedulers.boundedElastic());
}

public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate,
Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
this.serviceInstances = Flux
.defer(() -> delegate.getInstances(serviceId).collectList().flux());
resolveTimeout(environment);
this.serviceInstances = Flux.defer(() -> delegate.getInstances(serviceId)
.collectList().flux().timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
}));
}

@Override
Expand All @@ -68,4 +97,28 @@ public Flux<List<ServiceInstance>> get() {
return serviceInstances;
}

private void resolveTimeout(Environment environment) {
String providedTimeout = environment.getProperty(SERVICE_DISCOVERY_TIMEOUT);
if (providedTimeout != null) {
timeout = DurationStyle.detectAndParse(providedTimeout);
}
}

private void logTimeout() {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Timeout occurred while retrieving instances for service %s."
+ "The instances could not be retrieved during %s",
serviceId, timeout));
}
}

private void logException(Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Exception occurred while retrieving instances for service %s",
serviceId), error);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@
"name": "spring.cloud.loadbalancer.zone",
"type": "java.lang.String",
"description": "Spring Cloud LoadBalancer zone."
},
{
"name": "spring.cloud.loadbalancer.service-discovery.timeout",
"description": "String representation of Duration of the timeout for calls to service discovery.",
"type": "java.lang.String"
},
{
"defaultValue": true,
"name": "spring.cloud.loadbalancer.cache.enabled",
"description": "Enables Spring Cloud LoadBalancer caching mechanism.",
"type": "java.lang.Boolean"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package org.springframework.cloud.loadbalancer.core;

import java.time.Duration;

import org.assertj.core.util.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.internal.stubbing.answers.AnswersWithDelay;
import org.mockito.internal.stubbing.answers.Returns;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

Expand All @@ -29,6 +33,7 @@

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.springframework.cloud.loadbalancer.core.DiscoveryClientServiceInstanceListSupplier.SERVICE_DISCOVERY_TIMEOUT;

/**
* Tests for {@link DiscoveryClientServiceInstanceListSupplier}.
Expand All @@ -39,6 +44,8 @@ class DiscoveryClientServiceInstanceListSupplierTests {

private static final String SERVICE_ID = "test";

private static final Duration VERIFICATION_TIMEOUT = Duration.ofSeconds(10);

private final MockEnvironment environment = new MockEnvironment();

private final ReactiveDiscoveryClient reactiveDiscoveryClient = mock(
Expand Down Expand Up @@ -66,9 +73,10 @@ void shouldReturnRetrievedInstances() {
supplier = new DiscoveryClientServiceInstanceListSupplier(
reactiveDiscoveryClient, environment);
return supplier.get();
}).expectSubscription().expectNext(
Lists.list(instance("1host", false), instance("2host-secure", true)))
.thenCancel().verify();
}).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true)))
.thenCancel().verify(VERIFICATION_TIMEOUT);
}

@Test
Expand All @@ -81,7 +89,7 @@ void shouldUpdateReturnRetrievedInstances() {
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true)))
.thenCancel().verify();
.thenCancel().verify(VERIFICATION_TIMEOUT);

when(reactiveDiscoveryClient.getInstances(SERVICE_ID))
.thenReturn(Flux.just(instance("1host", false),
Expand All @@ -90,7 +98,31 @@ void shouldUpdateReturnRetrievedInstances() {
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true), instance("3host", false)))
.thenCancel().verify();
.thenCancel().verify(VERIFICATION_TIMEOUT);
}

@Test
void shouldReturnEmptyInstancesListOnException() {
when(reactiveDiscoveryClient.getInstances(SERVICE_ID))
.thenReturn(Flux.error(new RuntimeException("Exception")));

StepVerifier.withVirtualTime(() -> {
supplier = new DiscoveryClientServiceInstanceListSupplier(
reactiveDiscoveryClient, environment);
return supplier.get();
}).expectSubscription().expectNext(Lists.emptyList()).thenCancel()
.verify(VERIFICATION_TIMEOUT);
}

@Test
void shouldReturnEmptyInstancesListOnTimeout() {
environment.setProperty(SERVICE_DISCOVERY_TIMEOUT, "100ms");
when(reactiveDiscoveryClient.getInstances(SERVICE_ID)).thenReturn(Flux.never());
StepVerifier
.create(new DiscoveryClientServiceInstanceListSupplier(
reactiveDiscoveryClient, environment).get())
.expectSubscription().expectNext(Lists.emptyList()).thenCancel()
.verify(VERIFICATION_TIMEOUT);
}

@Test
Expand All @@ -102,9 +134,10 @@ void shouldReturnRetrievedInstancesBlockingClient() {
supplier = new DiscoveryClientServiceInstanceListSupplier(discoveryClient,
environment);
return supplier.get();
}).expectSubscription().expectNext(
Lists.list(instance("1host", false), instance("2host-secure", true)))
.thenCancel().verify();
}).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true)))
.thenCancel().verify(VERIFICATION_TIMEOUT);
}

@Test
Expand All @@ -123,7 +156,33 @@ void shouldUpdateReturnRetrievedInstancesBlockingClient() {
}).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true), instance("3host", false)))
.thenCancel().verify();
.thenCancel().verify(VERIFICATION_TIMEOUT);
}

@Test
void shouldReturnEmptyInstancesListOnExceptionBlockingClient() {
when(discoveryClient.getInstances(SERVICE_ID))
.thenThrow(new RuntimeException("Exception"));

StepVerifier.withVirtualTime(() -> {
supplier = new DiscoveryClientServiceInstanceListSupplier(discoveryClient,
environment);
return supplier.get();
}).expectSubscription().expectNext(Lists.emptyList()).thenCancel()
.verify(VERIFICATION_TIMEOUT);
}

@Test
void shouldReturnEmptyInstancesListOnTimeoutBlockingClient() {
environment.setProperty(SERVICE_DISCOVERY_TIMEOUT, "100ms");
when(discoveryClient.getInstances(SERVICE_ID)).thenAnswer(
new AnswersWithDelay(200, new Returns(Lists.list(instance("1host", false),
instance("2host-secure", true), instance("3host", false)))));
StepVerifier
.create(new DiscoveryClientServiceInstanceListSupplier(discoveryClient,
environment).get())
.expectSubscription().expectNext(Lists.emptyList()).thenCancel()
.verify(VERIFICATION_TIMEOUT);
}

}