Skip to content

Commit 48f57e3

Browse files
committed
Explicitly manage event loop in Reactor2TcpStompClient
Reactor2TcpStompClient now explicitly manages a Netty EventLoopGroup which prevents resource leaks on attempts to reconnect. Issue: SPR-15035
1 parent d92f697 commit 48f57e3

File tree

3 files changed

+76
-20
lines changed

3 files changed

+76
-20
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java

+70-19
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,30 +20,38 @@
2020
import java.util.List;
2121
import java.util.Properties;
2222

23+
import io.netty.channel.EventLoopGroup;
2324
import reactor.Environment;
2425
import reactor.core.config.ConfigurationReader;
2526
import reactor.core.config.DispatcherConfiguration;
2627
import reactor.core.config.DispatcherType;
2728
import reactor.core.config.ReactorConfiguration;
28-
import reactor.io.net.NetStreams;
29+
import reactor.io.net.NetStreams.TcpClientFactory;
2930
import reactor.io.net.Spec.TcpClientSpec;
31+
import reactor.io.net.impl.netty.NettyClientSocketOptions;
3032

33+
import org.springframework.context.Lifecycle;
3134
import org.springframework.messaging.Message;
3235
import org.springframework.messaging.tcp.TcpOperations;
3336
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
3437
import org.springframework.util.concurrent.ListenableFuture;
3538

3639
/**
37-
* A STOMP over TCP client that uses
38-
* {@link Reactor2TcpClient}.
40+
* A STOMP over TCP client that uses {@link Reactor2TcpClient}.
3941
*
4042
* @author Rossen Stoyanchev
4143
* @since 4.2
4244
*/
43-
public class Reactor2TcpStompClient extends StompClientSupport {
45+
public class Reactor2TcpStompClient extends StompClientSupport implements Lifecycle {
4446

4547
private final TcpOperations<byte[]> tcpClient;
4648

49+
private final EventLoopGroup eventLoopGroup;
50+
51+
private final Environment environment;
52+
53+
private volatile boolean running = false;
54+
4755

4856
/**
4957
* Create an instance with host "127.0.0.1" and port 61613.
@@ -57,11 +65,11 @@ public Reactor2TcpStompClient() {
5765
* @param host the host
5866
* @param port the port
5967
*/
60-
public Reactor2TcpStompClient(final String host, final int port) {
61-
ConfigurationReader reader = new StompClientDispatcherConfigReader();
62-
Environment environment = new Environment(reader).assignErrorJournal();
63-
StompTcpClientSpecFactory factory = new StompTcpClientSpecFactory(environment, host, port);
64-
this.tcpClient = new Reactor2TcpClient<byte[]>(factory);
68+
public Reactor2TcpStompClient(String host, int port) {
69+
this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
70+
this.environment = new Environment();
71+
this.tcpClient = new Reactor2TcpClient<byte[]>(
72+
new StompTcpClientSpecFactory(host, port, this.eventLoopGroup, this.environment));
6573
}
6674

6775
/**
@@ -70,6 +78,43 @@ public Reactor2TcpStompClient(final String host, final int port) {
7078
*/
7179
public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) {
7280
this.tcpClient = tcpClient;
81+
this.eventLoopGroup = null;
82+
this.environment = null;
83+
}
84+
85+
86+
@Override
87+
public void start() {
88+
if (!isRunning()) {
89+
this.running = true;
90+
91+
}
92+
}
93+
94+
@Override
95+
public void stop() {
96+
if (isRunning()) {
97+
this.running = false;
98+
try {
99+
if (this.eventLoopGroup != null) {
100+
this.eventLoopGroup.shutdownGracefully().await(5000);
101+
}
102+
if (this.environment != null) {
103+
this.environment.shutdown();
104+
}
105+
}
106+
catch (InterruptedException ex) {
107+
if (logger.isErrorEnabled()) {
108+
logger.error("Failed to shutdown gracefully", ex);
109+
}
110+
}
111+
112+
}
113+
}
114+
115+
@Override
116+
public boolean isRunning() {
117+
return this.running;
73118
}
74119

75120

@@ -120,30 +165,36 @@ public ReactorConfiguration read() {
120165
}
121166

122167

123-
private static class StompTcpClientSpecFactory
124-
implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {
125-
126-
private final Environment environment;
168+
private static class StompTcpClientSpecFactory implements TcpClientFactory<Message<byte[]>, Message<byte[]>> {
127169

128170
private final String host;
129171

130172
private final int port;
131173

132-
public StompTcpClientSpecFactory(Environment environment, String host, int port) {
133-
this.environment = environment;
174+
private final EventLoopGroup eventLoopGroup;
175+
176+
private final Environment environment;
177+
178+
179+
public StompTcpClientSpecFactory(String host, int port, EventLoopGroup group, Environment environment) {
134180
this.host = host;
135181
this.port = port;
182+
this.eventLoopGroup = group;
183+
this.environment = environment;
136184
}
137185

138186
@Override
139187
public TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
140188
TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
141189

190+
final Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), new StompDecoder());
191+
142192
return tcpClientSpec
143-
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
144193
.env(this.environment)
145-
.dispatcher(this.environment.getCachedDispatchers("StompClient").get())
146-
.connect(this.host, this.port);
194+
.dispatcher(this.environment.getDispatcher(Environment.WORK_QUEUE))
195+
.connect(this.host, this.port)
196+
.codec(codec)
197+
.options(new NettyClientSocketOptions().eventLoopGroup(this.eventLoopGroup));
147198
}
148199
}
149200

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompClientSupport.java

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
import java.util.Arrays;
2020

21+
import org.apache.commons.logging.Log;
22+
import org.apache.commons.logging.LogFactory;
23+
2124
import org.springframework.messaging.converter.MessageConverter;
2225
import org.springframework.messaging.converter.SimpleMessageConverter;
2326
import org.springframework.scheduling.TaskScheduler;
@@ -40,6 +43,8 @@
4043
*/
4144
public abstract class StompClientSupport {
4245

46+
protected Log logger = LogFactory.getLog(getClass());
47+
4348
private MessageConverter messageConverter = new SimpleMessageConverter();
4449

4550
private TaskScheduler taskScheduler;

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public Reactor2TcpClient(TcpClientFactory<Message<P>, Message<P>> tcpClientSpecF
146146
}
147147

148148

149-
private static NioEventLoopGroup initEventLoopGroup() {
149+
public static NioEventLoopGroup initEventLoopGroup() {
150150
int ioThreadCount;
151151
try {
152152
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));

0 commit comments

Comments
 (0)