19
19
import java .io .IOException ;
20
20
import java .net .URI ;
21
21
import java .time .Duration ;
22
+ import java .util .function .Function ;
22
23
23
24
import io .netty .channel .ChannelOption ;
25
+ import org .apache .commons .logging .Log ;
26
+ import org .apache .commons .logging .LogFactory ;
24
27
import reactor .netty .http .client .HttpClient ;
28
+ import reactor .netty .resources .ConnectionProvider ;
29
+ import reactor .netty .resources .LoopResources ;
25
30
31
+ import org .springframework .context .SmartLifecycle ;
26
32
import org .springframework .http .HttpMethod ;
33
+ import org .springframework .http .client .reactive .ReactorResourceFactory ;
34
+ import org .springframework .lang .Nullable ;
27
35
import org .springframework .util .Assert ;
28
36
29
37
/**
30
38
* Reactor-Netty implementation of {@link ClientHttpRequestFactory}.
31
39
*
40
+ * <p>This class implements {@link SmartLifecycle} and can be optionally declared
41
+ * as a Spring-managed bean in order to support JVM Checkpoint Restore.
42
+ *
32
43
* @author Arjen Poutsma
44
+ * @author Sebastien Deleuze
33
45
* @since 6.1
34
46
*/
35
- public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory {
47
+ public class ReactorNettyClientRequestFactory implements ClientHttpRequestFactory , SmartLifecycle {
48
+
49
+ private static final Log logger = LogFactory .getLog (ReactorNettyClientRequestFactory .class );
50
+
51
+ private final static Function <HttpClient , HttpClient > defaultInitializer = client -> client .compress (true );
52
+
53
+
54
+ private HttpClient httpClient ;
36
55
37
- private final HttpClient httpClient ;
56
+ @ Nullable
57
+ private final ReactorResourceFactory resourceFactory ;
58
+
59
+ @ Nullable
60
+ private final Function <HttpClient , HttpClient > mapper ;
38
61
39
62
private Duration exchangeTimeout = Duration .ofSeconds (5 );
40
63
41
64
private Duration readTimeout = Duration .ofSeconds (10 );
42
65
66
+ private volatile boolean running = true ;
67
+
68
+ private final Object lifecycleMonitor = new Object ();
69
+
43
70
44
71
/**
45
72
* Create a new instance of the {@code ReactorNettyClientRequestFactory}
46
73
* with a default {@link HttpClient} that has compression enabled.
47
74
*/
48
75
public ReactorNettyClientRequestFactory () {
49
- this (HttpClient .create ().compress (true ));
76
+ this .httpClient = defaultInitializer .apply (HttpClient .create ());
77
+ this .resourceFactory = null ;
78
+ this .mapper = null ;
50
79
}
51
80
52
81
/**
@@ -57,6 +86,47 @@ public ReactorNettyClientRequestFactory() {
57
86
public ReactorNettyClientRequestFactory (HttpClient httpClient ) {
58
87
Assert .notNull (httpClient , "HttpClient must not be null" );
59
88
this .httpClient = httpClient ;
89
+ this .resourceFactory = null ;
90
+ this .mapper = null ;
91
+ }
92
+
93
+ /**
94
+ * Constructor with externally managed Reactor Netty resources, including
95
+ * {@link LoopResources} for event loop threads, and {@link ConnectionProvider}
96
+ * for the connection pool.
97
+ * <p>This constructor should be used only when you don't want the client
98
+ * to participate in the Reactor Netty global resources. By default the
99
+ * client participates in the Reactor Netty global resources held in
100
+ * {@link reactor.netty.http.HttpResources}, which is recommended since
101
+ * fixed, shared resources are favored for event loop concurrency. However,
102
+ * consider declaring a {@link ReactorResourceFactory} bean with
103
+ * {@code globalResources=true} in order to ensure the Reactor Netty global
104
+ * resources are shut down when the Spring ApplicationContext is stopped or closed
105
+ * and restarted properly when the Spring ApplicationContext is
106
+ * (with JVM Checkpoint Restore for example).
107
+ * @param resourceFactory the resource factory to obtain the resources from
108
+ * @param mapper a mapper for further initialization of the created client
109
+ */
110
+ public ReactorNettyClientRequestFactory (ReactorResourceFactory resourceFactory , Function <HttpClient , HttpClient > mapper ) {
111
+ this .httpClient = createHttpClient (resourceFactory , mapper );
112
+ this .resourceFactory = resourceFactory ;
113
+ this .mapper = mapper ;
114
+ }
115
+
116
+
117
+ private static HttpClient createHttpClient (ReactorResourceFactory resourceFactory , Function <HttpClient , HttpClient > mapper ) {
118
+ ConnectionProvider provider = resourceFactory .getConnectionProvider ();
119
+ Assert .notNull (provider , "No ConnectionProvider: is ReactorResourceFactory not initialized yet?" );
120
+ return defaultInitializer .andThen (mapper ).andThen (applyLoopResources (resourceFactory ))
121
+ .apply (HttpClient .create (provider ));
122
+ }
123
+
124
+ private static Function <HttpClient , HttpClient > applyLoopResources (ReactorResourceFactory factory ) {
125
+ return httpClient -> {
126
+ LoopResources resources = factory .getLoopResources ();
127
+ Assert .notNull (resources , "No LoopResources: is ReactorResourceFactory not initialized yet?" );
128
+ return httpClient .runOn (resources );
129
+ };
60
130
}
61
131
62
132
@@ -129,4 +199,52 @@ public ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod) throws IO
129
199
return new ReactorNettyClientRequest (this .httpClient , uri , httpMethod , this .exchangeTimeout , this .readTimeout );
130
200
}
131
201
202
+ @ Override
203
+ public void start () {
204
+ synchronized (this .lifecycleMonitor ) {
205
+ if (!isRunning ()) {
206
+ if (this .resourceFactory != null && this .mapper != null ) {
207
+ this .httpClient = createHttpClient (this .resourceFactory , this .mapper );
208
+ }
209
+ else {
210
+ logger .warn ("Restarting a ReactorNettyClientRequestFactory bean is only supported with externally managed Reactor Netty resources" );
211
+ }
212
+ this .running = true ;
213
+ }
214
+ }
215
+ }
216
+
217
+ @ Override
218
+ public void stop () {
219
+ synchronized (this .lifecycleMonitor ) {
220
+ if (isRunning ()) {
221
+ this .running = false ;
222
+ }
223
+ }
224
+ }
225
+
226
+ @ Override
227
+ public final void stop (Runnable callback ) {
228
+ synchronized (this .lifecycleMonitor ) {
229
+ stop ();
230
+ callback .run ();
231
+ }
232
+ }
233
+
234
+ @ Override
235
+ public boolean isRunning () {
236
+ return this .running ;
237
+ }
238
+
239
+ @ Override
240
+ public boolean isAutoStartup () {
241
+ return false ;
242
+ }
243
+
244
+ @ Override
245
+ public int getPhase () {
246
+ // Start after ReactorResourceFactory
247
+ return 1 ;
248
+ }
249
+
132
250
}
0 commit comments