|
27 | 27 | import org.springframework.context.ApplicationEventPublisherAware; |
28 | 28 | import org.springframework.context.SmartLifecycle; |
29 | 29 | import org.springframework.core.log.LogAccessor; |
| 30 | +import org.springframework.core.task.AsyncTaskExecutor; |
30 | 31 | import org.springframework.core.task.SimpleAsyncTaskExecutor; |
31 | | -import org.springframework.core.task.TaskExecutor; |
32 | | -import org.springframework.core.task.support.ExecutorServiceAdapter; |
| 32 | +import org.springframework.core.task.support.TaskExecutorAdapter; |
33 | 33 | import org.springframework.integration.leader.Candidate; |
34 | 34 | import org.springframework.integration.leader.Context; |
35 | 35 | import org.springframework.integration.leader.DefaultCandidate; |
@@ -100,8 +100,7 @@ public String getRole() { |
100 | 100 | /** |
101 | 101 | * Executor service for running leadership daemon. |
102 | 102 | */ |
103 | | - private ExecutorService executorService = |
104 | | - new ExecutorServiceAdapter(new SimpleAsyncTaskExecutor("lock-leadership-")); |
| 103 | + private AsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("lock-leadership-"); |
105 | 104 |
|
106 | 105 | /** |
107 | 106 | * Time in milliseconds to wait in between attempts to re-acquire the lock, once it is |
@@ -152,7 +151,7 @@ public String getRole() { |
152 | 151 |
|
153 | 152 | /** |
154 | 153 | * Future returned by submitting an {@link LeaderSelector} to |
155 | | - * {@link #executorService}. This is used to cancel leadership. |
| 154 | + * {@link #taskExecutor}. This is used to cancel leadership. |
156 | 155 | */ |
157 | 156 | private volatile Future<?> future; |
158 | 157 |
|
@@ -181,22 +180,23 @@ public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) { |
181 | 180 | /** |
182 | 181 | * Set the {@link ExecutorService}, where is not provided then a default of |
183 | 182 | * single thread Executor will be used. |
184 | | - * @param executorService the executor service |
| 183 | + * @param taskExecutor the executor service |
185 | 184 | * @since 5.0.2 |
186 | | - * @deprecated since 6.2 in favor of {@link #setTaskExecutor(TaskExecutor)} |
| 185 | + * @deprecated since 6.2 in favor of {@link #setTaskExecutor(AsyncTaskExecutor)} |
187 | 186 | */ |
188 | 187 | @Deprecated(since = "6.2", forRemoval = true) |
189 | | - public void setExecutorService(ExecutorService executorService) { |
190 | | - this.executorService = executorService; |
| 188 | + public void setTaskExecutor(ExecutorService taskExecutor) { |
| 189 | + setTaskExecutor(new TaskExecutorAdapter(taskExecutor)); |
191 | 190 | } |
192 | 191 |
|
193 | 192 | /** |
194 | | - * Set a {@link TaskExecutor} for running leadership daemon. |
195 | | - * @param taskExecutor the {@link TaskExecutor} to use. |
| 193 | + * Set a {@link AsyncTaskExecutor} for running leadership daemon. |
| 194 | + * @param taskExecutor the {@link AsyncTaskExecutor} to use. |
196 | 195 | * @since 6.2 |
197 | 196 | */ |
198 | | - public void setTaskExecutor(TaskExecutor taskExecutor) { |
199 | | - this.executorService = new ExecutorServiceAdapter(taskExecutor); |
| 197 | + public void setTaskExecutor(AsyncTaskExecutor taskExecutor) { |
| 198 | + Assert.notNull(taskExecutor, "A 'taskExecutor' must not be null."); |
| 199 | + this.taskExecutor = taskExecutor; |
200 | 200 | } |
201 | 201 |
|
202 | 202 | public void setHeartBeatMillis(long heartBeatMillis) { |
@@ -293,7 +293,7 @@ public synchronized void start() { |
293 | 293 | if (!this.running) { |
294 | 294 | this.leaderSelector = new LeaderSelector(buildLeaderPath()); |
295 | 295 | this.running = true; |
296 | | - this.future = this.executorService.submit(this.leaderSelector); |
| 296 | + this.future = this.taskExecutor.submit(this.leaderSelector); |
297 | 297 | LOGGER.debug("Started LeaderInitiator"); |
298 | 298 | } |
299 | 299 | } |
@@ -461,7 +461,7 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR |
461 | 461 | private void restartSelectorBecauseOfError(Exception ex) { |
462 | 462 | LOGGER.warn(ex, () -> "Restarting LeaderSelector for " + this.context + " because of error."); |
463 | 463 | LockRegistryLeaderInitiator.this.future = |
464 | | - LockRegistryLeaderInitiator.this.executorService.submit( |
| 464 | + LockRegistryLeaderInitiator.this.taskExecutor.submit( |
465 | 465 | () -> { |
466 | 466 | // Give it a chance to elect some other leader. |
467 | 467 | Thread.sleep(LockRegistryLeaderInitiator.this.busyWaitMillis); |
|
0 commit comments