-
Notifications
You must be signed in to change notification settings - Fork 38.4k
Allow multiple destinations per subscription in DefaultSubscriptionRegistry #26986
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@alienisty Please sign the Contributor License Agreement! Click here to manually synchronize the status of this Pull Request. See the FAQ for frequently asked questions. |
@alienisty Thank you for signing the Contributor License Agreement! |
...ing/src/main/java/org/springframework/messaging/simp/broker/DefaultSubscriptionRegistry.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subscription id's must be unique. From the STOMP spec:
Since a single connection can have multiple open subscriptions with a server, an id header MUST be included in the frame to uniquely identify the subscription.
This has been discussed at length under #26118 and #26474. I believe this is likely the same issue.
Can you share what your @EnableWebSocketMessageBroker
config looks like? In particular what, if any, destination prefixes, are configured.
We don't use the annotation, we extend DelegatingWebSocketMessageBrokerConfiguration directly: @Configuration
public static class MessageBrokerConfiguration
extends DelegatingWebSocketMessageBrokerConfiguration
implements ApplicationListener<ApplicationPreparedEvent> {
private final ApplicationProperties.WebSocket.StompBrokerRelay config;
private final WebSocketMessageBrokerStats webSocketMessageBrokerStats;
public MessageBrokerConfiguration(
ApplicationContext context,
ApplicationProperties properties,
@Lazy WebSocketMessageBrokerStats webSocketMessageBrokerStats) {
// We need to ensure that the application context is available when the STOMP sub-protocol bean is built,
// otherwise the session events won't work. (see WebSocketMessageBrokerConfigurationSupport.stompWebSocketHandlerMapping())
this.setApplicationContext(context);
this.config = properties.getWebSocket().getStompBrokerRelay();
this.webSocketMessageBrokerStats = webSocketMessageBrokerStats;
}
@Override
public void onApplicationEvent(ApplicationPreparedEvent event) {
long statsLoggingPeriod = config.getStatsLoggingPeriod();
webSocketMessageBrokerStats.setLoggingPeriod(MINUTES.toMillis(statsLoggingPeriod));
}
} As you can see there is nothing changing how the protocol is implemented and the superclass creates and registers an instance of SubProtocolWebSocketHandler and UserDestinationMessageHandler into the ExecutorSubscribableChannel. In ExecutorSubscribableChannel.sendInternal(), the same Message, therefore with the same subscription-id header is passed to both handlers. When the message is a "SUBSCRIBE", both hanlders will try to subscribe their destinations with the same subscription-id. I can appreciate what the STOMP protocol specification says, I would argue, though, that the subscription-id must be unique per connection not per destination, so much so that messages need to specify both subscription-id and destination; if they were a one-to-one association, the destination header would be redundant. At the very least we have a problem with how user destinations are registered. |
I think your issue is exactly the same. If there aren't any destination prefixes configured for messages to the broker, then the broker ends up having all messages forwarded to it, but "/user" prefixed messages aren't meant to go directly there. Instead, Can you try configuring prefixes for the messages that should go to the broker? See the discussion and recommendations under #26474. |
Hi @rstoyanchev, setting the user destination prefix to "/user" fixes the problem, but I think that either the reference documentation should be changed or such prefix should be "/user" by default. In fact, the reference says:
which is true, but the reference documentation doesn't mention the user destination prefix, and for sending a message using a messaging template, it only says:
@Service
public class TradeServiceImpl implements TradeService {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// ...
public void afterTradeExecuted(Trade trade) {
this.messagingTemplate.convertAndSendToUser(
trade.getUserName(), "/queue/position-updates", trade.getResult());
}
} which now it does not work unless we set the user destination prefix to "/user". |
I have added a3655c4 to add some guidance to the reference docs as suggested and will close this issue. For 6.0, we could consider having the broker explicitly filter out messages with user destinations when it is itself not configured with any specific prefixes, but that would be better as a separate issue. |
Issue #24395 broke "user" destinations because it made subscription-id->destination a one to one association whereas it is a one to many.
This restores such state while keeping the optimisations done in #24395.