Skip to content

Commit 7c37497

Browse files
committed
Add STOMP broker relay to configure "host" header
Issue: SPR-10955
1 parent cf7889e commit 7c37497

File tree

2 files changed

+60
-7
lines changed

2 files changed

+60
-7
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

Lines changed: 58 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,24 @@
4747

4848

4949
/**
50-
* A {@link MessageHandler} that handles messages by forwarding them to a STOMP broker and
51-
* reversely sends any returned messages from the broker to the provided
52-
* {@link MessageChannel}.
50+
* A {@link MessageHandler} that handles messages by forwarding them to a STOMP broker.
51+
* For each new {@link SimpMessageType#CONNECT CONNECT} message, an independent TCP
52+
* connection to the broker is opened and used exclusively for all messages from the
53+
* client that originated the CONNECT message. Messages from the same client are
54+
* identified through the session id message header. Reversely, when the STOMP broker
55+
* sends messages back on the TCP connection, those messages are enriched with the session
56+
* id of the client and sent back downstream through the {@link MessageChannel} provided
57+
* to the constructor.
58+
* <p>
59+
* This class also automatically opens a default "system" TCP connection to the message
60+
* broker that is used for sending messages that originate from the server application (as
61+
* opposed to from a client). Such messages are recognized because they are not associated
62+
* with any client and therefore do not have a session id header. The "system" connection
63+
* is effectively shared and cannot be used to receive messages. Several properties are
64+
* provided to configure the "system" session including the the
65+
* {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode},
66+
* heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and
67+
* {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals.
5368
*
5469
* @author Rossen Stoyanchev
5570
* @author Andy Wilkinson
@@ -71,6 +86,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
7186

7287
private long systemHeartbeatReceiveInterval = 10000;
7388

89+
private String virtualHost;
90+
7491
private Environment environment;
7592

7693
private TcpClient<Message<byte[]>, Message<byte[]>> tcpClient;
@@ -120,11 +137,13 @@ public int getRelayPort() {
120137
}
121138

122139
/**
123-
* Set the interval, in milliseconds, at which the "system" relay session will,
124-
* in the absence of any other data being sent, send a heartbeat to the STOMP broker.
125-
* A value of zero will prevent heartbeats from being sent to the broker.
140+
* Set the interval, in milliseconds, at which the "system" relay session will, in the
141+
* absence of any other data being sent, send a heartbeat to the STOMP broker. A value
142+
* of zero will prevent heartbeats from being sent to the broker.
126143
* <p>
127144
* The default value is 10000.
145+
* <p>
146+
* See class-level documentation for more information on the "system" session.
128147
*/
129148
public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) {
130149
this.systemHeartbeatSendInterval = systemHeartbeatSendInterval;
@@ -145,6 +164,8 @@ public long getSystemHeartbeatSendInterval() {
145164
* heartbeats from the broker.
146165
* <p>
147166
* The default value is 10000.
167+
* <p>
168+
* See class-level documentation for more information on the "system" session.
148169
*/
149170
public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) {
150171
this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval;
@@ -161,6 +182,8 @@ public long getSystemHeartbeatReceiveInterval() {
161182
/**
162183
* Set the login for the "system" relay session used to send messages to the STOMP
163184
* broker without having a client session (e.g. REST/HTTP request handling method).
185+
* <p>
186+
* See class-level documentation for more information on the "system" session.
164187
*/
165188
public void setSystemLogin(String systemLogin) {
166189
Assert.hasText(systemLogin, "systemLogin must not be empty");
@@ -177,6 +200,8 @@ public String getSystemLogin() {
177200
/**
178201
* Set the passcode for the "system" relay session used to send messages to the STOMP
179202
* broker without having a client session (e.g. REST/HTTP request handling method).
203+
* <p>
204+
* See class-level documentation for more information on the "system" session.
180205
*/
181206
public void setSystemPasscode(String systemPasscode) {
182207
this.systemPasscode = systemPasscode;
@@ -189,6 +214,26 @@ public String getSystemPasscode() {
189214
return this.systemPasscode;
190215
}
191216

217+
/**
218+
* Set the value of the "host" header to use in STOMP CONNECT frames. When this
219+
* property is configured, a "host" header will be added to every STOMP frame sent to
220+
* the STOMP broker. This may be useful for example in a cloud environment where the
221+
* actual host to which the TCP connection is established is different from the host
222+
* providing the cloud-based STOMP service.
223+
* <p>
224+
* By default this property is not set.
225+
*/
226+
public void setVirtualHost(String virtualHost) {
227+
this.virtualHost = virtualHost;
228+
}
229+
230+
/**
231+
* @return the configured virtual host value.
232+
*/
233+
public String getVirtualHost() {
234+
return this.virtualHost;
235+
}
236+
192237

193238
@Override
194239
protected void startInternal() {
@@ -252,7 +297,10 @@ protected void handleMessageInternal(Message<?> message) {
252297
}
253298

254299
if (SimpMessageType.CONNECT.equals(messageType)) {
255-
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
300+
if (getVirtualHost() != null) {
301+
headers.setHost(getVirtualHost());
302+
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
303+
}
256304
StompRelaySession session = new StompRelaySession(sessionId);
257305
this.relaySessions.put(sessionId, session);
258306
session.connect(message);
@@ -516,6 +564,9 @@ public void connect() {
516564
headers.setLogin(systemLogin);
517565
headers.setPasscode(systemPasscode);
518566
headers.setHeartbeat(systemHeartbeatSendInterval, systemHeartbeatReceiveInterval);
567+
if (getVirtualHost() != null) {
568+
headers.setHost(getVirtualHost());
569+
}
519570
Message<?> connectMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
520571
super.connect(connectMessage);
521572
}

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public void tearDown() throws Exception {
113113
}
114114
}
115115

116+
// test "host" header (virtualHost property) when TCP client is behind interface and configurable
117+
116118
@Test
117119
public void publishSubscribe() throws Exception {
118120

0 commit comments

Comments
 (0)