Skip to content

Allow refetching instances for healthcheck #855

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/src/main/asciidoc/_configprops.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
|spring.cloud.discovery.client.health-indicator.enabled | true |
|spring.cloud.discovery.client.health-indicator.include-description | false |
|spring.cloud.discovery.client.simple.instances | |
|spring.cloud.discovery.client.simple.local.instance-id | | The unique identifier or name for the service instance.
|spring.cloud.discovery.client.simple.local.metadata | | Metadata for the service instance. Can be used by discovery clients to modify their behaviour per instance, e.g. when load balancing.
|spring.cloud.discovery.client.simple.local.service-id | | The identifier or name for the service. Multiple instances might share the same service ID.
|spring.cloud.discovery.client.simple.local.uri | | The URI of the service instance. Will be parsed to extract the scheme, host, and port.
|spring.cloud.discovery.client.simple.order | |
|spring.cloud.discovery.enabled | true | Enables discovery client health indicators.
|spring.cloud.features.enabled | true | Enables the features endpoint.
Expand All @@ -34,6 +30,9 @@
|spring.cloud.loadbalancer.health-check.initial-delay | 0 | Initial delay value for the HealthCheck scheduler.
|spring.cloud.loadbalancer.health-check.interval | 25s | Interval for rerunning the HealthCheck scheduler.
|spring.cloud.loadbalancer.health-check.path | |
|spring.cloud.loadbalancer.health-check.refetch-instances | false | Indicates whether the instances should be refetched by the <code>HealthCheckServiceInstanceListSupplier</code>. This can be used if the instances can be updated and the underlying delegate does not provide an ongoing flux.
|spring.cloud.loadbalancer.health-check.refetch-instances-interval | 25s | Interval for refetching available service instances.
|spring.cloud.loadbalancer.health-check.repeat-health-check | true | Indicates whether health checks should keep repeating. It might be useful to set it to <code>false</code> if periodically refetching the instances, as every refetch will also trigger a healthcheck.
|spring.cloud.loadbalancer.retry.enabled | true |
|spring.cloud.loadbalancer.retry.max-retries-on-next-service-instance | 1 | Number of retries to be executed on the next <code>ServiceInstance</code>. A <code>ServiceInstance</code> is chosen before each retry call.
|spring.cloud.loadbalancer.retry.max-retries-on-same-service-instance | 0 | Number of retries to be executed on the same <code>ServiceInstance</code>.
Expand Down
12 changes: 9 additions & 3 deletions docs/src/main/asciidoc/spring-cloud-commons.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -953,16 +953,22 @@ TIP: This mechanism is particularly helpful while using the `SimpleDiscoveryClie
clients backed by an actual Service Registry, it's not necessary to use, as we already get
healthy instances after querying the external ServiceDiscovery.

TIP:: This supplier is also recommended for setups with a small number of instances per service
TIP: This supplier is also recommended for setups with a small number of instances per service
in order to avoid retrying calls on a failing instance.

WARNING: If using any of the Service Discovery-backed suppliers, adding this health-check mechanism is usually not necessary, as we retrieve the health state of the instances directly
from the Service Registry.

TIP: The `HealthCheckServiceInstanceListSupplier` relies on having updated instances provided by a delegate flux. In the rare cases when you want to use a delegate that does not refresh the instances, even though the list of instances may change (such as the `ReactiveDiscoveryClientServiceInstanceListSupplier` provided by us), you can set `spring.cloud.loadbalancer.health-check.refetch-instances` to `true` to have the instance list refreshed by the `HealthCheckServiceInstanceListSupplier`. You can then also adjust the refretch intervals by modifying the value of `spring.cloud.loadbalancer.health-check.refetch-instances-interval` and opt to disable the additional healthcheck repetitions by setting `spring.cloud.loadbalancer.repeat-health-check` to `fasle` as every instances refetch
will also trigger a healthcheck.

`HealthCheckServiceInstanceListSupplier` uses properties prefixed with
`spring.cloud.loadbalancer.health-check`. You can set the `initialDelay` and `interval`
for the scheduler. You can set the default path for the healthcheck URL by setting
the value of the `spring.cloud.loadbalancer.health-check.path.default`. You can also set a specific value
for any given service by setting the value of the `spring.cloud.loadbalancer.health-check.path.[SERVICE_ID]`, substituting the `[SERVICE_ID]` with the correct ID of your service. If the path is not set, `/actuator/health` is used by default.

TIP:: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own.
TIP: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own.

In order to use the health-check scheduler approach, you will have to instantiate a `HealthCheckServiceInstanceListSupplier` bean in a <<custom-loadbalancer-configuration,custom configuration>>.

Expand All @@ -987,7 +993,7 @@ public class CustomLoadBalancerConfiguration {
}
----

NOTE:: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`, therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`.
WARNING: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`. Therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`.


[[spring-cloud-loadbalancer-starter]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,44 @@ public static class HealthCheck {
*/
private Duration interval = Duration.ofSeconds(25);

/**
* Interval for refetching available service instances.
*/
private Duration refetchInstancesInterval = Duration.ofSeconds(25);

private Map<String, String> path = new LinkedCaseInsensitiveMap<>();

/**
* Indicates whether the instances should be refetched by the
* <code>HealthCheckServiceInstanceListSupplier</code>. This can be used if the
* instances can be updated and the underlying delegate does not provide an
* ongoing flux.
*/
private boolean refetchInstances = false;

/**
* Indicates whether health checks should keep repeating. It might be useful to
* set it to <code>false</code> if periodically refetching the instances, as every
* refetch will also trigger a healthcheck.
*/
private boolean repeatHealthCheck = true;

public boolean getRefetchInstances() {
return refetchInstances;
}

public void setRefetchInstances(boolean refetchInstances) {
this.refetchInstances = refetchInstances;
}

public boolean getRepeatHealthCheck() {
return repeatHealthCheck;
}

public void setRepeatHealthCheck(boolean repeatHealthCheck) {
this.repeatHealthCheck = repeatHealthCheck;
}

public int getInitialDelay() {
return initialDelay;
}
Expand All @@ -66,6 +102,14 @@ public void setInitialDelay(int initialDelay) {
this.initialDelay = initialDelay;
}

public Duration getRefetchInstancesInterval() {
return refetchInstancesInterval;
}

public void setRefetchInstancesInterval(Duration refetchInstancesInterval) {
this.refetchInstancesInterval = refetchInstancesInterval;
}

public Map<String, String> getPath() {
return path;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.retry.Repeat;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
Expand Down Expand Up @@ -64,14 +65,19 @@ public class HealthCheckServiceInstanceListSupplier
public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
super(delegate);
this.healthCheck = healthCheck;
defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
"/actuator/health");
this.webClient = webClient;
aliveInstancesReplay = Flux.defer(delegate)
.delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay()))
this.healthCheck = healthCheck;
Repeat<Object> aliveInstancesReplayRepeat = Repeat
.onlyIf(repeatContext -> this.healthCheck.getRefetchInstances())
.fixedBackoff(healthCheck.getRefetchInstancesInterval());
Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate)
.switchMap(serviceInstances -> healthCheckFlux(serviceInstances).map(
alive -> Collections.unmodifiableList(new ArrayList<>(alive))))
.repeatWhen(aliveInstancesReplayRepeat);
aliveInstancesReplay = aliveInstancesFlux
.delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay()))
.replay(1).refCount(1);
}

Expand All @@ -86,6 +92,9 @@ public void afterPropertiesSet() {

protected Flux<List<ServiceInstance>> healthCheckFlux(
List<ServiceInstance> instances) {
Repeat<Object> healthCheckFluxRepeat = Repeat
.onlyIf(repeatContext -> healthCheck.getRepeatHealthCheck())
.fixedBackoff(healthCheck.getInterval());
return Flux.defer(() -> {
List<Mono<ServiceInstance>> checks = new ArrayList<>(instances.size());
for (ServiceInstance instance : instances) {
Expand Down Expand Up @@ -117,7 +126,7 @@ protected Flux<List<ServiceInstance>> healthCheckFlux(
result.add(alive);
return result;
}).defaultIfEmpty(result);
}).repeatWhen(restart -> restart.delayElements(healthCheck.getInterval()));
}).repeatWhen(healthCheckFluxRepeat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.loadbalancer.core;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -47,6 +48,8 @@
import org.springframework.web.reactive.function.client.WebClient;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Tests for {@link HealthCheckServiceInstanceListSupplier}.
Expand Down Expand Up @@ -79,7 +82,7 @@ void setUp() {
}

@AfterEach
void tearDown() throws Exception {
void tearDown() {
if (listSupplier != null) {
listSupplier.destroy();
listSupplier = null;
Expand Down Expand Up @@ -140,14 +143,14 @@ void shouldReturnOnlyAliveService() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
Mockito.doReturn(Mono.just(false)).when(mock).isAlive(serviceInstance2);

Expand Down Expand Up @@ -176,14 +179,14 @@ void shouldEmitOnEachAliveServiceInBatch() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance2);

Expand Down Expand Up @@ -213,14 +216,14 @@ void shouldNotFailIfIsAliveReturnsError() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
.isAlive(serviceInstance2);
Expand Down Expand Up @@ -250,8 +253,8 @@ void shouldEmitAllInstancesIfAllIsAliveChecksFailed() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));
Expand Down Expand Up @@ -282,8 +285,8 @@ void shouldMakeInitialDaleyAfterPropertiesSet() {
SERVICE_ID, "127.0.0.1", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get())
.thenReturn(Flux.just(Lists.list(serviceInstance1)));
Expand Down Expand Up @@ -314,14 +317,14 @@ void shouldRepeatIsAliveChecksIndefinitely() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(false), Mono.just(true)).when(mock)
.isAlive(serviceInstance1);
Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
Expand Down Expand Up @@ -352,14 +355,14 @@ void shouldTimeoutIsAliveCheck() {
SERVICE_ID, "127.0.0.1", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get())
.thenReturn(Flux.just(Lists.list(serviceInstance1)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.when(mock.isAlive(serviceInstance1)).thenReturn(Mono.never(),
Mono.just(true));

Expand Down Expand Up @@ -391,8 +394,8 @@ void shouldUpdateInstances() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Flux<List<ServiceInstance>> instances = Flux
.just(Lists.list(serviceInstance1))
Expand Down Expand Up @@ -421,6 +424,39 @@ protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
.verify(VERIFY_TIMEOUT);
}

@Test
void shouldRefetchInstances() {
healthCheck.setInitialDelay(1000);
healthCheck.setRepeatHealthCheck(false);
healthCheck.setRefetchInstancesInterval(Duration.ofSeconds(1));
healthCheck.setRefetchInstances(true);
ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1",
SERVICE_ID, "127.0.0.1", port, false);
ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2",
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
when(delegate.get())
.thenReturn(Flux.just(Collections.singletonList(serviceInstance1)))
.thenReturn(Flux.just(Collections.singletonList(serviceInstance2)));
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return Mono.just(true);
}
};
return listSupplier.get();
}).expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list(serviceInstance1))
.thenAwait(healthCheck.getRefetchInstancesInterval())
.expectNext(Lists.list(serviceInstance2)).thenCancel()
.verify(VERIFY_TIMEOUT);
}

@Test
void shouldCacheResultIfAfterPropertiesSetInvoked() {
healthCheck.setInitialDelay(1000);
Expand All @@ -430,8 +466,8 @@ void shouldCacheResultIfAfterPropertiesSetInvoked() {
AtomicInteger emitCounter = new AtomicInteger();

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get())
.thenReturn(Flux.just(Lists.list(serviceInstance1)));
Expand Down Expand Up @@ -468,8 +504,7 @@ void shouldCancelSubscription() {

final AtomicInteger instancesCanceled = new AtomicInteger();
final AtomicBoolean subscribed = new AtomicBoolean();
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.get())
.thenReturn(Flux.<List<ServiceInstance>>never()
.doOnSubscribe(subscription -> subscribed.set(true))
Expand Down