Skip to content

Commit 9d768a8

Browse files
committed
Make ReactorClientHttpConnector lifecycle-aware
With this commit, ReactorClientHttpConnector now implements SmartLifecycle which optionally allows recreating the HttpClient after ReactorResourceFactory has been updated. Closes gh-31180
1 parent ab2ad74 commit 9d768a8

File tree

2 files changed

+156
-6
lines changed

2 files changed

+156
-6
lines changed

Diff for: spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

+85-6
Original file line numberDiff line numberDiff line change
@@ -20,30 +20,51 @@
2020
import java.util.concurrent.atomic.AtomicReference;
2121
import java.util.function.Function;
2222

23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
2325
import reactor.core.publisher.Mono;
2426
import reactor.netty.NettyOutbound;
2527
import reactor.netty.http.client.HttpClient;
2628
import reactor.netty.http.client.HttpClientRequest;
2729
import reactor.netty.resources.ConnectionProvider;
2830
import reactor.netty.resources.LoopResources;
2931

32+
import org.springframework.context.Lifecycle;
33+
import org.springframework.context.SmartLifecycle;
3034
import org.springframework.http.HttpMethod;
35+
import org.springframework.lang.Nullable;
3136
import org.springframework.util.Assert;
3237

3338
/**
3439
* Reactor-Netty implementation of {@link ClientHttpConnector}.
3540
*
41+
* <p>This class implements {@link Lifecycle} and can be optionally declared
42+
* as a Spring-managed bean.
43+
*
3644
* @author Brian Clozel
3745
* @author Rossen Stoyanchev
46+
* @author Sebastien Deleuze
3847
* @since 5.0
3948
* @see reactor.netty.http.client.HttpClient
4049
*/
41-
public class ReactorClientHttpConnector implements ClientHttpConnector {
50+
public class ReactorClientHttpConnector implements ClientHttpConnector, SmartLifecycle {
51+
52+
private static final Log logger = LogFactory.getLog(ReactorClientHttpConnector.class);
4253

4354
private final static Function<HttpClient, HttpClient> defaultInitializer = client -> client.compress(true);
4455

4556

46-
private final HttpClient httpClient;
57+
private HttpClient httpClient;
58+
59+
@Nullable
60+
private final ReactorResourceFactory resourceFactory;
61+
62+
@Nullable
63+
private final Function<HttpClient, HttpClient> mapper;
64+
65+
private volatile boolean running = true;
66+
67+
private final Object lifecycleMonitor = new Object();
4768

4869

4970
/**
@@ -54,6 +75,8 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
5475
*/
5576
public ReactorClientHttpConnector() {
5677
this.httpClient = defaultInitializer.apply(HttpClient.create());
78+
this.resourceFactory = null;
79+
this.mapper = null;
5780
}
5881

5982
/**
@@ -68,14 +91,20 @@ public ReactorClientHttpConnector() {
6891
* consider declaring a {@link ReactorResourceFactory} bean with
6992
* {@code globalResources=true} in order to ensure the Reactor Netty global
7093
* resources are shut down when the Spring ApplicationContext is closed.
71-
* @param factory the resource factory to obtain the resources from
94+
* @param resourceFactory the resource factory to obtain the resources from
7295
* @param mapper a mapper for further initialization of the created client
7396
* @since 5.1
7497
*/
75-
public ReactorClientHttpConnector(ReactorResourceFactory factory, Function<HttpClient, HttpClient> mapper) {
76-
ConnectionProvider provider = factory.getConnectionProvider();
98+
public ReactorClientHttpConnector(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
99+
this.httpClient = createHttpClient(resourceFactory, mapper);
100+
this.resourceFactory = resourceFactory;
101+
this.mapper = mapper;
102+
}
103+
104+
private static HttpClient createHttpClient(ReactorResourceFactory resourceFactory, Function<HttpClient, HttpClient> mapper) {
105+
ConnectionProvider provider = resourceFactory.getConnectionProvider();
77106
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
78-
this.httpClient = defaultInitializer.andThen(mapper).andThen(applyLoopResources(factory))
107+
return defaultInitializer.andThen(mapper).andThen(applyLoopResources(resourceFactory))
79108
.apply(HttpClient.create(provider));
80109
}
81110

@@ -96,6 +125,8 @@ private static Function<HttpClient, HttpClient> applyLoopResources(ReactorResour
96125
public ReactorClientHttpConnector(HttpClient httpClient) {
97126
Assert.notNull(httpClient, "HttpClient is required");
98127
this.httpClient = httpClient;
128+
this.resourceFactory = null;
129+
this.mapper = null;
99130
}
100131

101132

@@ -131,4 +162,52 @@ private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpCl
131162
return new ReactorClientHttpRequest(method, uri, request, nettyOutbound);
132163
}
133164

165+
@Override
166+
public void start() {
167+
synchronized (this.lifecycleMonitor) {
168+
if (!isRunning()) {
169+
if (this.resourceFactory != null && this.mapper != null) {
170+
this.httpClient = createHttpClient(this.resourceFactory, this.mapper);
171+
}
172+
else {
173+
logger.warn("Restarting a ReactorClientHttpConnector bean is only supported with externally managed Reactor Netty resources");
174+
}
175+
this.running = true;
176+
}
177+
}
178+
}
179+
180+
@Override
181+
public void stop() {
182+
synchronized (this.lifecycleMonitor) {
183+
if (isRunning()) {
184+
this.running = false;
185+
}
186+
}
187+
}
188+
189+
@Override
190+
public final void stop(Runnable callback) {
191+
synchronized (this.lifecycleMonitor) {
192+
stop();
193+
callback.run();
194+
}
195+
}
196+
197+
@Override
198+
public boolean isRunning() {
199+
return this.running;
200+
}
201+
202+
@Override
203+
public boolean isAutoStartup() {
204+
return false;
205+
}
206+
207+
@Override
208+
public int getPhase() {
209+
// Start after ReactorResourceFactory
210+
return 1;
211+
}
212+
134213
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2002-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client.reactive;
18+
19+
import java.util.function.Function;
20+
21+
import org.junit.jupiter.api.Test;
22+
import reactor.netty.http.client.HttpClient;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
/**
27+
* @author Sebastien Deleuze
28+
*/
29+
public class ReactorClientHttpConnectorTests {
30+
31+
@Test
32+
void restartWithDefaultConstructor() {
33+
ReactorClientHttpConnector connector = new ReactorClientHttpConnector();
34+
assertThat(connector.isRunning()).isTrue();
35+
connector.start();
36+
assertThat(connector.isRunning()).isTrue();
37+
connector.stop();
38+
assertThat(connector.isRunning()).isFalse();
39+
connector.start();
40+
assertThat(connector.isRunning()).isTrue();
41+
}
42+
43+
@Test
44+
void restartWithExternalResourceFactory() {
45+
ReactorResourceFactory resourceFactory = new ReactorResourceFactory();
46+
resourceFactory.afterPropertiesSet();
47+
Function<HttpClient, HttpClient> mapper = Function.identity();
48+
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(resourceFactory, mapper);
49+
assertThat(connector.isRunning()).isTrue();
50+
connector.start();
51+
assertThat(connector.isRunning()).isTrue();
52+
connector.stop();
53+
assertThat(connector.isRunning()).isFalse();
54+
connector.start();
55+
assertThat(connector.isRunning()).isTrue();
56+
}
57+
58+
@Test
59+
void restartWithHttpClient() {
60+
HttpClient httpClient = HttpClient.create();
61+
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
62+
assertThat(connector.isRunning()).isTrue();
63+
connector.start();
64+
assertThat(connector.isRunning()).isTrue();
65+
connector.stop();
66+
assertThat(connector.isRunning()).isFalse();
67+
connector.start();
68+
assertThat(connector.isRunning()).isTrue();
69+
}
70+
71+
}

0 commit comments

Comments
 (0)