Skip to content

Commit 88a17a4

Browse files
committed
Reactor2TcpClient constructor with address supplier
Issue: SPR-12452
1 parent 0e28bee commit 88a17a4

File tree

3 files changed

+63
-11
lines changed

3 files changed

+63
-11
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import org.springframework.messaging.MessageChannel;
2121
import org.springframework.messaging.SubscribableChannel;
2222
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
23+
import org.springframework.messaging.tcp.TcpOperations;
2324
import org.springframework.util.Assert;
2425

2526
/**
@@ -51,6 +52,9 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
5152
@Nullable
5253
private String virtualHost;
5354

55+
@Nullable
56+
private TcpOperations<byte[]> tcpClient;
57+
5458
private boolean autoStartup = true;
5559

5660
@Nullable
@@ -166,6 +170,18 @@ public StompBrokerRelayRegistration setVirtualHost(String virtualHost) {
166170
return this;
167171
}
168172

173+
/**
174+
* Configure a TCP client for managing TCP connections to the STOMP broker.
175+
* <p>By default {@code ReactorNettyTcpClient} is used.
176+
* <p><strong>Note:</strong> when this property is used, any
177+
* {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
178+
* specified are effectively ignored.
179+
* @since 4.3.15
180+
*/
181+
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
182+
this.tcpClient = tcpClient;
183+
}
184+
169185
/**
170186
* Configure whether the {@link StompBrokerRelayMessageHandler} should start
171187
* automatically when the Spring ApplicationContext is refreshed.
@@ -239,6 +255,9 @@ protected StompBrokerRelayMessageHandler getMessageHandler(SubscribableChannel b
239255
if (this.virtualHost != null) {
240256
handler.setVirtualHost(this.virtualHost);
241257
}
258+
if (this.tcpClient != null) {
259+
handler.setTcpClient(this.tcpClient);
260+
}
242261

243262
handler.setAutoStartup(this.autoStartup);
244263

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -340,6 +340,9 @@ public String getVirtualHost() {
340340
/**
341341
* Configure a TCP client for managing TCP connections to the STOMP broker.
342342
* <p>By default {@link ReactorNettyTcpClient} is used.
343+
* <p><strong>Note:</strong> when this property is used, any
344+
* {@link #setRelayHost(String) host} or {@link #setRelayPort(int) port}
345+
* specified are effectively ignored.
343346
*/
344347
public void setTcpClient(@Nullable TcpOperations<byte[]> tcpClient) {
345348
this.tcpClient = tcpClient;
@@ -613,8 +616,8 @@ public void afterConnectFailure(Throwable ex) {
613616
* the TCP connection, failure to send a message, missed heartbeat, etc.
614617
*/
615618
protected void handleTcpConnectionFailure(String error, @Nullable Throwable ex) {
616-
if (logger.isErrorEnabled()) {
617-
logger.error("TCP connection failure in session " + this.sessionId + ": " + error, ex);
619+
if (logger.isWarnEnabled()) {
620+
logger.warn("TCP connection failure in session " + this.sessionId + ": " + error, ex);
618621
}
619622
try {
620623
sendStompErrorFrameToClient(error);

src/docs/asciidoc/web/websocket.adoc

+37-7
Original file line numberDiff line numberDiff line change
@@ -1440,9 +1440,9 @@ values ``guest``/``guest``.
14401440
====
14411441
The STOMP broker relay always sets the `login` and `passcode` headers on every `CONNECT`
14421442
frame that it forwards to the broker on behalf of clients. Therefore WebSocket clients
1443-
need not set those headers; they will be ignored. As the following section explains,
1444-
instead WebSocket clients should rely on HTTP authentication to protect the WebSocket
1445-
endpoint and establish the client identity.
1443+
need not set those headers; they will be ignored. As the <<websocket-stomp-authentication>>
1444+
section explains, instead WebSocket clients should rely on HTTP authentication to protect
1445+
the WebSocket endpoint and establish the client identity.
14461446
====
14471447

14481448
The STOMP broker relay also sends and receives heartbeats to and from the message
@@ -1451,13 +1451,43 @@ and receiving heartbeats (10 seconds each by default). If connectivity to the br
14511451
is lost, the broker relay will continue to try to reconnect, every 5 seconds,
14521452
until it succeeds.
14531453

1454-
[NOTE]
1455-
====
1456-
A Spring bean can implement `ApplicationListener<BrokerAvailabilityEvent>` in order
1454+
Any Spring bean can implement `ApplicationListener<BrokerAvailabilityEvent>` in order
14571455
to receive notifications when the "system" connection to the broker is lost and
14581456
re-established. For example a Stock Quote service broadcasting stock quotes can
14591457
stop trying to send messages when there is no active "system" connection.
1460-
====
1458+
1459+
By default, the STOMP broker relay always connects, and reconnects as needed if
1460+
connectivity is lost, to the same host and port. If you wish to supply multiple addresses,
1461+
on each attempt to connect, you can configure a supplier of addresses, instead of a
1462+
fixed host and port. For example:
1463+
1464+
[source,java,indent=0]
1465+
[subs="verbatim,quotes"]
1466+
----
1467+
@Configuration
1468+
@EnableWebSocketMessageBroker
1469+
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
1470+
1471+
// ...
1472+
1473+
@Override
1474+
public void configureMessageBroker(MessageBrokerRegistry registry) {
1475+
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
1476+
registry.setApplicationDestinationPrefixes("/app");
1477+
}
1478+
1479+
private ReactorNettyTcpClient<byte[]> createTcpClient() {
1480+
1481+
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
1482+
builder.connectAddress(()-> {
1483+
// Select address to connect to ...
1484+
});
1485+
};
1486+
1487+
return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
1488+
}
1489+
}
1490+
----
14611491

14621492
The STOMP broker relay can also be configured with a `virtualHost` property.
14631493
The value of this property will be set as the `host` header of every `CONNECT` frame

0 commit comments

Comments
 (0)