Skip to content

Commit 3836aa0

Browse files
committed
Message broker thread pools should be set up in allowCoreThreadTimeOut mode
Issue: SPR-12249
1 parent e003d21 commit 3836aa0

File tree

3 files changed

+42
-51
lines changed

3 files changed

+42
-51
lines changed

spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2013 the original author or authors.
2+
* Copyright 2002-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -78,10 +78,10 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
7878

7979
private int keepAliveSeconds = 60;
8080

81-
private boolean allowCoreThreadTimeOut = false;
82-
8381
private int queueCapacity = Integer.MAX_VALUE;
8482

83+
private boolean allowCoreThreadTimeOut = false;
84+
8585
private ThreadPoolExecutor threadPoolExecutor;
8686

8787

@@ -154,17 +154,6 @@ public int getKeepAliveSeconds() {
154154
}
155155
}
156156

157-
/**
158-
* Specify whether to allow core threads to time out. This enables dynamic
159-
* growing and shrinking even in combination with a non-zero queue (since
160-
* the max pool size will only grow once the queue is full).
161-
* <p>Default is "false".
162-
* @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
163-
*/
164-
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
165-
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
166-
}
167-
168157
/**
169158
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
170159
* Default is {@code Integer.MAX_VALUE}.
@@ -177,6 +166,17 @@ public void setQueueCapacity(int queueCapacity) {
177166
this.queueCapacity = queueCapacity;
178167
}
179168

169+
/**
170+
* Specify whether to allow core threads to time out. This enables dynamic
171+
* growing and shrinking even in combination with a non-zero queue (since
172+
* the max pool size will only grow once the queue is full).
173+
* <p>Default is "false".
174+
* @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
175+
*/
176+
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
177+
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
178+
}
179+
180180

181181
@Override
182182
protected ExecutorService initializeExecutor(

spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2020

21-
2221
/**
2322
* A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}.
2423
*
@@ -38,13 +37,11 @@ public class TaskExecutorRegistration {
3837

3938
/**
4039
* Set the core pool size of the ThreadPoolExecutor.
41-
*
42-
* <p><strong>NOTE:</strong> the core pool size is effectively the max pool size
40+
* <p><strong>NOTE:</strong> The core pool size is effectively the max pool size
4341
* when an unbounded {@link #queueCapacity(int) queueCapacity} is configured
4442
* (the default). This is essentially the "Unbounded queues" strategy as explained
4543
* in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When
4644
* this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored.
47-
*
4845
* <p>By default this is set to twice the value of
4946
* {@link Runtime#availableProcessors()}. In an an application where tasks do not
5047
* block frequently, the number should be closer to or equal to the number of
@@ -57,13 +54,11 @@ public TaskExecutorRegistration corePoolSize(int corePoolSize) {
5754

5855
/**
5956
* Set the max pool size of the ThreadPoolExecutor.
60-
*
61-
* <p><strong>NOTE:</strong> when an unbounded
57+
* <p><strong>NOTE:</strong> When an unbounded
6258
* {@link #queueCapacity(int) queueCapacity} is configured (the default), the
6359
* max pool size is effectively ignored. See the "Unbounded queues" strategy
6460
* in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor} for
6561
* more details.
66-
*
6762
* <p>By default this is set to {@code Integer.MAX_VALUE}.
6863
*/
6964
public TaskExecutorRegistration maxPoolSize(int maxPoolSize) {
@@ -73,14 +68,11 @@ public TaskExecutorRegistration maxPoolSize(int maxPoolSize) {
7368

7469
/**
7570
* Set the queue capacity for the ThreadPoolExecutor.
76-
*
77-
* <p><strong>NOTE:</strong> when an unbounded
78-
* {@link #queueCapacity(int) queueCapacity} is configured (the default) the
79-
* core pool size is effectively the max pool size. This is essentially the
80-
* "Unbounded queues" strategy as explained in
71+
* <p><strong>NOTE:</strong> when an unbounded {@code queueCapacity} is configured
72+
* (the default), the core pool size is effectively the max pool size. This is
73+
* essentially the "Unbounded queues" strategy as explained in
8174
* {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When
8275
* this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored.
83-
*
8476
* <p>By default this is set to {@code Integer.MAX_VALUE}.
8577
*/
8678
public TaskExecutorRegistration queueCapacity(int queueCapacity) {
@@ -92,8 +84,7 @@ public TaskExecutorRegistration queueCapacity(int queueCapacity) {
9284
* Set the time limit for which threads may remain idle before being terminated.
9385
* If there are more than the core number of threads currently in the pool,
9486
* after waiting this amount of time without processing a task, excess threads
95-
* will be terminated. This overrides any value set in the constructor.
96-
*
87+
* will be terminated. This overrides any value set in the constructor.
9788
* <p>By default this is set to 60.
9889
*/
9990
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
@@ -107,6 +98,7 @@ protected ThreadPoolTaskExecutor getTaskExecutor() {
10798
executor.setMaxPoolSize(this.maxPoolSize);
10899
executor.setKeepAliveSeconds(this.keepAliveSeconds);
109100
executor.setQueueCapacity(this.queueCapacity);
101+
executor.setAllowCoreThreadTimeOut(true);
110102
return executor;
111103
}
112104

spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323

2424
import org.w3c.dom.Element;
2525

26-
import org.springframework.beans.factory.config.CustomScopeConfigurer;
2726
import org.springframework.beans.MutablePropertyValues;
2827
import org.springframework.beans.factory.config.BeanDefinition;
2928
import org.springframework.beans.factory.config.ConstructorArgumentValues;
29+
import org.springframework.beans.factory.config.CustomScopeConfigurer;
3030
import org.springframework.beans.factory.config.RuntimeBeanReference;
3131
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
3232
import org.springframework.beans.factory.parsing.CompositeComponentDefinition;
@@ -43,11 +43,11 @@
4343
import org.springframework.messaging.simp.SimpMessagingTemplate;
4444
import org.springframework.messaging.simp.SimpSessionScope;
4545
import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
46+
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
47+
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
4648
import org.springframework.messaging.simp.user.DefaultUserDestinationResolver;
4749
import org.springframework.messaging.simp.user.DefaultUserSessionRegistry;
48-
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
4950
import org.springframework.messaging.simp.user.UserDestinationMessageHandler;
50-
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
5151
import org.springframework.messaging.support.ExecutorSubscribableChannel;
5252
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
5353
import org.springframework.util.Assert;
@@ -61,29 +61,27 @@
6161
import org.springframework.web.socket.server.support.WebSocketHttpRequestHandler;
6262
import org.springframework.web.socket.sockjs.support.SockJsHttpRequestHandler;
6363

64-
6564
/**
66-
* A {@link org.springframework.beans.factory.xml.BeanDefinitionParser}
67-
* that provides the configuration for the
68-
* {@code <websocket:message-broker/>} XML namespace element.
69-
* <p>
70-
* Registers a Spring MVC {@link org.springframework.web.servlet.handler.SimpleUrlHandlerMapping}
71-
* with order=1 to map HTTP WebSocket handshake requests from STOMP/WebSocket clients.
72-
* <p>
73-
* Registers the following {@link org.springframework.messaging.MessageChannel}s:
65+
* A {@link org.springframework.beans.factory.xml.BeanDefinitionParser} that provides
66+
* the configuration for the {@code <websocket:message-broker/>} XML namespace element.
67+
*
68+
* <p>Registers a Spring MVC {@link org.springframework.web.servlet.handler.SimpleUrlHandlerMapping}
69+
* with order 1 to map HTTP WebSocket handshake requests from STOMP/WebSocket clients.
70+
*
71+
* <p>Registers the following {@link org.springframework.messaging.MessageChannel}s:
7472
* <ul>
75-
* <li>"clientInboundChannel" for receiving messages from clients (e.g. WebSocket clients)
76-
* <li>"clientOutboundChannel" for sending messages to clients (e.g. WebSocket clients)
77-
* <li>"brokerChannel" for sending messages from within the application to the message broker
73+
* <li>"clientInboundChannel" for receiving messages from clients (e.g. WebSocket clients)
74+
* <li>"clientOutboundChannel" for sending messages to clients (e.g. WebSocket clients)
75+
* <li>"brokerChannel" for sending messages from within the application to the message broker
7876
* </ul>
79-
* <p>
80-
* Registers one of the following based on the selected message broker options:
77+
*
78+
* <p>Registers one of the following based on the selected message broker options:
8179
* <ul>
82-
* <li> a {@link SimpleBrokerMessageHandler} if the <simple-broker/> is used
83-
* <li> a {@link StompBrokerRelayMessageHandler} if the <stomp-broker-relay/> is used
80+
* <li>a {@link SimpleBrokerMessageHandler} if the <simple-broker/> is used
81+
* <li>a {@link StompBrokerRelayMessageHandler} if the <stomp-broker-relay/> is used
8482
* </ul>
85-
* <p>
86-
* Registers a {@link UserDestinationMessageHandler} for handling user destinations.
83+
*
84+
* <p>Registers a {@link UserDestinationMessageHandler} for handling user destinations.
8785
*
8886
* @author Brian Clozel
8987
* @author Rossen Stoyanchev
@@ -95,7 +93,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
9593

9694
private static final int DEFAULT_MAPPING_ORDER = 1;
9795

98-
private static final boolean jackson2Present= ClassUtils.isPresent(
96+
private static final boolean jackson2Present = ClassUtils.isPresent(
9997
"com.fasterxml.jackson.databind.ObjectMapper", MessageBrokerBeanDefinitionParser.class.getClassLoader());
10098

10199

@@ -215,6 +213,7 @@ private RootBeanDefinition getDefaultExecutorBeanDefinition(String channelName)
215213
executorDef.getPropertyValues().add("corePoolSize", Runtime.getRuntime().availableProcessors() * 2);
216214
executorDef.getPropertyValues().add("maxPoolSize", Integer.MAX_VALUE);
217215
executorDef.getPropertyValues().add("queueCapacity", Integer.MAX_VALUE);
216+
executorDef.getPropertyValues().add("allowCoreThreadTimeOut", true);
218217
return executorDef;
219218
}
220219

0 commit comments

Comments
 (0)