Skip to content

Commit 88b2f0e

Browse files
Gh 760 health check with cache new (spring-cloud#765)
* Cache first element of service instance list flux. * Invoke destroy() and afterPropertiesSet() in non-bean ServiceInstanceListSupplier delegates. * Fix return updated instances. * Fix return updated instances.
1 parent 766ab94 commit 88b2f0e

File tree

6 files changed

+321
-8
lines changed

6 files changed

+321
-8
lines changed

spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
6969
return Mono.empty();
7070
}
7171
return Flux.just(list).materialize().collectList();
72-
}, delegate.getServiceId()).onCacheMissResume(delegate)
72+
}, delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
7373
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
7474
.doOnNext(instances -> {
7575
Cache cache = cacheManager

spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DelegatingServiceInstanceListSupplier.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,21 @@
1616

1717
package org.springframework.cloud.loadbalancer.core;
1818

19+
import org.springframework.beans.factory.DisposableBean;
20+
import org.springframework.beans.factory.InitializingBean;
1921
import org.springframework.util.Assert;
2022

2123
/**
2224
* Represents a {@link ServiceInstanceListSupplier} that uses a delegate
2325
* {@link ServiceInstanceListSupplier} instance underneath.
2426
*
2527
* @author Spencer Gibb
28+
* @author Olga Maciaszek-Sharma
2629
*/
2730
public abstract class DelegatingServiceInstanceListSupplier
28-
implements ServiceInstanceListSupplier {
31+
implements ServiceInstanceListSupplier, InitializingBean, DisposableBean {
2932

30-
private final ServiceInstanceListSupplier delegate;
33+
protected final ServiceInstanceListSupplier delegate;
3134

3235
public DelegatingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {
3336
Assert.notNull(delegate, "delegate may not be null");
@@ -43,4 +46,18 @@ public String getServiceId() {
4346
return this.delegate.getServiceId();
4447
}
4548

49+
@Override
50+
public void afterPropertiesSet() throws Exception {
51+
if (delegate instanceof InitializingBean) {
52+
((InitializingBean) delegate).afterPropertiesSet();
53+
}
54+
}
55+
56+
@Override
57+
public void destroy() throws Exception {
58+
if (delegate instanceof DisposableBean) {
59+
((DisposableBean) delegate).destroy();
60+
}
61+
}
62+
4663
}

spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,20 +41,21 @@ public class DiscoveryClientServiceInstanceListSupplier
4141

4242
private final String serviceId;
4343

44-
private final Flux<ServiceInstance> serviceInstances;
44+
private final Flux<List<ServiceInstance>> serviceInstances;
4545

4646
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate,
4747
Environment environment) {
4848
this.serviceId = environment.getProperty(PROPERTY_NAME);
4949
this.serviceInstances = Flux
50-
.defer(() -> Flux.fromIterable(delegate.getInstances(serviceId)))
51-
.subscribeOn(Schedulers.boundedElastic());
50+
.defer(() -> Flux.fromIterable(delegate.getInstances(serviceId))
51+
.collectList().flux().subscribeOn(Schedulers.boundedElastic()));
5252
}
5353

5454
public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate,
5555
Environment environment) {
5656
this.serviceId = environment.getProperty(PROPERTY_NAME);
57-
this.serviceInstances = delegate.getInstances(serviceId);
57+
this.serviceInstances = Flux
58+
.defer(() -> delegate.getInstances(serviceId).collectList().flux());
5859
}
5960

6061
@Override
@@ -64,7 +65,7 @@ public String getServiceId() {
6465

6566
@Override
6667
public Flux<List<ServiceInstance>> get() {
67-
return serviceInstances.collectList().flux();
68+
return serviceInstances;
6869
}
6970

7071
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Copyright 2012-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.loadbalancer.core;
18+
19+
import org.junit.jupiter.api.Test;
20+
import org.junit.jupiter.api.extension.ExtendWith;
21+
import reactor.core.publisher.Flux;
22+
import reactor.core.publisher.Mono;
23+
24+
import org.springframework.beans.factory.ObjectProvider;
25+
import org.springframework.beans.factory.annotation.Autowired;
26+
import org.springframework.boot.test.context.SpringBootTest;
27+
import org.springframework.cloud.client.DefaultServiceInstance;
28+
import org.springframework.cloud.client.ServiceInstance;
29+
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
30+
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties;
31+
import org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient;
32+
import org.springframework.cloud.loadbalancer.cache.LoadBalancerCacheManager;
33+
import org.springframework.cloud.loadbalancer.config.LoadBalancerCacheAutoConfiguration;
34+
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
35+
import org.springframework.context.ConfigurableApplicationContext;
36+
import org.springframework.context.annotation.Bean;
37+
import org.springframework.context.annotation.Configuration;
38+
import org.springframework.context.annotation.Import;
39+
import org.springframework.test.context.junit.jupiter.SpringExtension;
40+
import org.springframework.web.reactive.function.client.WebClient;
41+
42+
import static java.time.Duration.ofMillis;
43+
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
44+
45+
/**
46+
* Tests for {@link CachingServiceInstanceListSupplier}.
47+
*
48+
* @author Olga Maciaszek-Sharma
49+
*/
50+
@SpringBootTest(classes = CachingServiceInstanceListSupplierTests.TestConfig.class)
51+
@ExtendWith(SpringExtension.class)
52+
class CachingServiceInstanceListSupplierTests {
53+
54+
public static final String SERVICE_ID = "test";
55+
56+
static {
57+
System.setProperty("loadbalancer.client.name", SERVICE_ID);
58+
}
59+
60+
@Autowired
61+
BlockingLoadBalancerClient blockingLoadBalancerClient;
62+
63+
private static DefaultServiceInstance instance(String host, boolean secure) {
64+
return new DefaultServiceInstance(SERVICE_ID, SERVICE_ID, host, 80, secure);
65+
}
66+
67+
@Test
68+
void shouldNotHangOnCachingWhenDelegateReturnsInfiniteStream() {
69+
assertTimeoutPreemptively(ofMillis(500), () -> {
70+
blockingLoadBalancerClient.choose(SERVICE_ID);
71+
});
72+
73+
}
74+
75+
@Configuration(proxyBeanMethods = false)
76+
@Import(LoadBalancerCacheAutoConfiguration.class)
77+
protected static class TestConfig {
78+
79+
@Bean
80+
public ReactiveDiscoveryClient reactiveDiscoveryClient() {
81+
return new ReactiveDiscoveryClient() {
82+
@Override
83+
public String description() {
84+
return SERVICE_ID;
85+
}
86+
87+
@Override
88+
public Flux<ServiceInstance> getInstances(String serviceId) {
89+
return Flux.just(instance("1host", false),
90+
instance("2host-secure", true));
91+
}
92+
93+
@Override
94+
public Flux<String> getServices() {
95+
return Flux.just(SERVICE_ID);
96+
}
97+
};
98+
}
99+
100+
@Bean
101+
ReactorLoadBalancer<ServiceInstance> reactorLoadBalancer(
102+
ObjectProvider<ServiceInstanceListSupplier> provider) {
103+
return new RoundRobinLoadBalancer(provider, SERVICE_ID);
104+
}
105+
106+
@Bean
107+
LoadBalancerClientFactory loadBalancerClientFactory() {
108+
return new LoadBalancerClientFactory();
109+
}
110+
111+
@Bean
112+
BlockingLoadBalancerClient blockingLoadBalancerClient(
113+
LoadBalancerClientFactory loadBalancerClientFactory) {
114+
return new BlockingLoadBalancerClient(loadBalancerClientFactory);
115+
}
116+
117+
@Bean
118+
public LoadBalancerProperties loadBalancerProperties() {
119+
return new LoadBalancerProperties();
120+
}
121+
122+
@Bean
123+
public WebClient.Builder webClientBuilder() {
124+
return WebClient.builder();
125+
}
126+
127+
@Bean
128+
ServiceInstanceListSupplier supplier(ConfigurableApplicationContext context,
129+
ReactiveDiscoveryClient discoveryClient,
130+
LoadBalancerProperties loadBalancerProperties,
131+
WebClient.Builder webClientBuilder) {
132+
DiscoveryClientServiceInstanceListSupplier firstDelegate = new DiscoveryClientServiceInstanceListSupplier(
133+
discoveryClient, context.getEnvironment());
134+
HealthCheckServiceInstanceListSupplier delegate = new TestHealthCheckServiceInstanceListSupplier(
135+
firstDelegate, loadBalancerProperties.getHealthCheck(),
136+
webClientBuilder.build());
137+
delegate.afterPropertiesSet();
138+
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
139+
.getBeanProvider(LoadBalancerCacheManager.class);
140+
return new CachingServiceInstanceListSupplier(delegate,
141+
cacheManagerProvider.getIfAvailable());
142+
}
143+
144+
private static class TestHealthCheckServiceInstanceListSupplier
145+
extends HealthCheckServiceInstanceListSupplier {
146+
147+
TestHealthCheckServiceInstanceListSupplier(
148+
ServiceInstanceListSupplier delegate,
149+
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
150+
super(delegate, healthCheck, webClient);
151+
}
152+
153+
@Override
154+
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
155+
return Mono.just(true);
156+
}
157+
158+
}
159+
160+
}
161+
162+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright 2012-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.loadbalancer.core;
18+
19+
import org.assertj.core.util.Lists;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Test;
22+
import reactor.core.publisher.Flux;
23+
import reactor.test.StepVerifier;
24+
25+
import org.springframework.cloud.client.DefaultServiceInstance;
26+
import org.springframework.cloud.client.discovery.DiscoveryClient;
27+
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
28+
import org.springframework.mock.env.MockEnvironment;
29+
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.when;
32+
33+
/**
34+
* Tests for {@link DiscoveryClientServiceInstanceListSupplier}.
35+
*
36+
* @author Olga Maciaszek-Sharma
37+
*/
38+
class DiscoveryClientServiceInstanceListSupplierTests {
39+
40+
private static final String SERVICE_ID = "test";
41+
42+
private final MockEnvironment environment = new MockEnvironment();
43+
44+
private final ReactiveDiscoveryClient reactiveDiscoveryClient = mock(
45+
ReactiveDiscoveryClient.class);
46+
47+
private final DiscoveryClient discoveryClient = mock(DiscoveryClient.class);
48+
49+
private DiscoveryClientServiceInstanceListSupplier supplier;
50+
51+
private static DefaultServiceInstance instance(String host, boolean secure) {
52+
return new DefaultServiceInstance(SERVICE_ID, SERVICE_ID, host, 80, secure);
53+
}
54+
55+
@BeforeEach
56+
void setUp() {
57+
environment.setProperty("loadbalancer.client.name", SERVICE_ID);
58+
}
59+
60+
@Test
61+
void shouldReturnRetrievedInstances() {
62+
when(reactiveDiscoveryClient.getInstances(SERVICE_ID)).thenReturn(
63+
Flux.just(instance("1host", false), instance("2host-secure", true)));
64+
65+
StepVerifier.withVirtualTime(() -> {
66+
supplier = new DiscoveryClientServiceInstanceListSupplier(
67+
reactiveDiscoveryClient, environment);
68+
return supplier.get();
69+
}).expectSubscription().expectNext(
70+
Lists.list(instance("1host", false), instance("2host-secure", true)))
71+
.thenCancel().verify();
72+
}
73+
74+
@Test
75+
void shouldUpdateReturnRetrievedInstances() {
76+
when(reactiveDiscoveryClient.getInstances(SERVICE_ID)).thenReturn(
77+
Flux.just(instance("1host", false), instance("2host-secure", true)));
78+
supplier = new DiscoveryClientServiceInstanceListSupplier(reactiveDiscoveryClient,
79+
environment);
80+
81+
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
82+
.expectNext(Lists.list(instance("1host", false),
83+
instance("2host-secure", true)))
84+
.thenCancel().verify();
85+
86+
when(reactiveDiscoveryClient.getInstances(SERVICE_ID))
87+
.thenReturn(Flux.just(instance("1host", false),
88+
instance("2host-secure", true), instance("3host", false)));
89+
90+
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
91+
.expectNext(Lists.list(instance("1host", false),
92+
instance("2host-secure", true), instance("3host", false)))
93+
.thenCancel().verify();
94+
}
95+
96+
@Test
97+
void shouldReturnRetrievedInstancesBlockingClient() {
98+
StepVerifier.withVirtualTime(() -> {
99+
when(discoveryClient.getInstances(SERVICE_ID)).thenReturn(
100+
Lists.list(instance("1host", false), instance("2host-secure", true)));
101+
102+
supplier = new DiscoveryClientServiceInstanceListSupplier(discoveryClient,
103+
environment);
104+
return supplier.get();
105+
}).expectSubscription().expectNext(
106+
Lists.list(instance("1host", false), instance("2host-secure", true)))
107+
.thenCancel().verify();
108+
}
109+
110+
@Test
111+
void shouldUpdateReturnRetrievedInstancesBlockingClient() {
112+
when(discoveryClient.getInstances(SERVICE_ID)).thenReturn(
113+
Lists.list(instance("1host", false), instance("2host-secure", true)));
114+
supplier = new DiscoveryClientServiceInstanceListSupplier(discoveryClient,
115+
environment);
116+
117+
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
118+
.expectNext(Lists.list(instance("1host", false),
119+
instance("2host-secure", true)))
120+
.thenCancel().verify();
121+
122+
when(discoveryClient.getInstances(SERVICE_ID))
123+
.thenReturn(Lists.list(instance("1host", false),
124+
instance("2host-secure", true), instance("3host", false)));
125+
126+
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
127+
.expectNext(Lists.list(instance("1host", false),
128+
instance("2host-secure", true), instance("3host", false)))
129+
.thenCancel().verify();
130+
}
131+
132+
}

src/checkstyle/checkstyle-suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@
1414
<suppress files=".*RefreshAutoConfigurationTests.*" checks="JavadocVariable"/>
1515
<suppress files=".*RefreshAutoConfigurationMoreClassPathTests.*" checks="JavadocVariable"/>
1616
<suppress files=".*EnvironmentDecryptApplicationInitializerTests.*" checks="JavadocVariable"/>
17+
<suppress files=".*CachingServiceInstanceListSupplierTests.*" checks="RegexpSinglelineJava"/>
1718
</suppressions>

0 commit comments

Comments
 (0)