1818
1919import java .util .concurrent .Callable ;
2020import java .util .concurrent .ExecutorService ;
21- import java .util .concurrent .Executors ;
2221import java .util .concurrent .Future ;
2322import java .util .concurrent .TimeUnit ;
2423import java .util .concurrent .locks .Lock ;
2524
26- import org .apache .commons .logging .Log ;
27- import org .apache .commons .logging .LogFactory ;
28-
2925import org .springframework .beans .factory .DisposableBean ;
3026import org .springframework .context .ApplicationEventPublisher ;
3127import org .springframework .context .ApplicationEventPublisherAware ;
3228import org .springframework .context .SmartLifecycle ;
29+ import org .springframework .core .log .LogAccessor ;
30+ import org .springframework .core .task .SimpleAsyncTaskExecutor ;
31+ import org .springframework .core .task .TaskExecutor ;
32+ import org .springframework .core .task .support .ExecutorServiceAdapter ;
3333import org .springframework .integration .leader .Candidate ;
3434import org .springframework .integration .leader .Context ;
3535import org .springframework .integration .leader .DefaultCandidate ;
3636import org .springframework .integration .leader .event .DefaultLeaderEventPublisher ;
3737import org .springframework .integration .leader .event .LeaderEventPublisher ;
3838import org .springframework .integration .support .locks .LockRegistry ;
39- import org .springframework .scheduling .concurrent .CustomizableThreadFactory ;
4039import org .springframework .util .Assert ;
4140
4241/**
@@ -67,9 +66,7 @@ public class LockRegistryLeaderInitiator implements SmartLifecycle, DisposableBe
6766
6867 public static final long DEFAULT_BUSY_WAIT_TIME = 50L ;
6968
70- private static final Log LOGGER = LogFactory .getLog (LockRegistryLeaderInitiator .class );
71-
72- private final Object lifecycleMonitor = new Object ();
69+ private static final LogAccessor LOGGER = new LogAccessor (LockRegistryLeaderInitiator .class );
7370
7471 /**
7572 * A lock registry. The locks it manages should be global (whatever that means for the
@@ -104,13 +101,7 @@ public String getRole() {
104101 * Executor service for running leadership daemon.
105102 */
106103 private ExecutorService executorService =
107- Executors .newSingleThreadExecutor (new CustomizableThreadFactory ("lock-leadership-" ));
108-
109- /**
110- * Flag to denote whether the {@link ExecutorService} was provided via the setter and
111- * thus should not be shutdown when {@link #destroy()} is called.
112- */
113- private boolean executorServiceExplicitlySet ;
104+ new ExecutorServiceAdapter (new SimpleAsyncTaskExecutor ("lock-leadership-" ));
114105
115106 /**
116107 * Time in milliseconds to wait in between attempts to re-acquire the lock, once it is
@@ -192,10 +183,20 @@ public LockRegistryLeaderInitiator(LockRegistry locks, Candidate candidate) {
192183 * single thread Executor will be used.
193184 * @param executorService the executor service
194185 * @since 5.0.2
186+ * @deprecated since 6.2 in favor of {@link #setTaskExecutor(TaskExecutor)}
195187 */
188+ @ Deprecated (since = "6.2" , forRemoval = true )
196189 public void setExecutorService (ExecutorService executorService ) {
197190 this .executorService = executorService ;
198- this .executorServiceExplicitlySet = true ;
191+ }
192+
193+ /**
194+ * Set a {@link TaskExecutor} for running leadership daemon.
195+ * @param taskExecutor the {@link TaskExecutor} to use.
196+ * @since 6.2
197+ */
198+ public void setTaskExecutor (TaskExecutor taskExecutor ) {
199+ this .executorService = new ExecutorServiceAdapter (taskExecutor );
199200 }
200201
201202 public void setHeartBeatMillis (long heartBeatMillis ) {
@@ -224,9 +225,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
224225 */
225226 @ Override
226227 public boolean isRunning () {
227- synchronized (this .lifecycleMonitor ) {
228- return this .running ;
229- }
228+ return this .running ;
230229 }
231230
232231 @ Override
@@ -287,43 +286,36 @@ public void setPublishFailedEvents(boolean publishFailedEvents) {
287286 * Start the registration of the {@link #candidate} for leader election.
288287 */
289288 @ Override
290- public void start () {
289+ public synchronized void start () {
291290 if (this .leaderEventPublisher == null && this .applicationEventPublisher != null ) {
292291 this .leaderEventPublisher = new DefaultLeaderEventPublisher (this .applicationEventPublisher );
293292 }
294- synchronized (this .lifecycleMonitor ) {
295- if (!this .running ) {
296- this .leaderSelector = new LeaderSelector (buildLeaderPath ());
297- this .running = true ;
298- this .future = this .executorService .submit (this .leaderSelector );
299- LOGGER .debug ("Started LeaderInitiator" );
300- }
293+ if (!this .running ) {
294+ this .leaderSelector = new LeaderSelector (buildLeaderPath ());
295+ this .running = true ;
296+ this .future = this .executorService .submit (this .leaderSelector );
297+ LOGGER .debug ("Started LeaderInitiator" );
301298 }
302299 }
303300
304301 @ Override
305302 public void destroy () {
306303 stop ();
307- if (!this .executorServiceExplicitlySet ) {
308- this .executorService .shutdown ();
309- }
310304 }
311305
312306 /**
313307 * Stop the registration of the {@link #candidate} for leader election. If the
314308 * candidate is currently leader, its leadership will be revoked.
315309 */
316310 @ Override
317- public void stop () {
318- synchronized (this .lifecycleMonitor ) {
319- if (this .running ) {
320- this .running = false ;
321- if (this .future != null ) {
322- this .future .cancel (true );
323- }
324- this .future = null ;
325- LOGGER .debug ("Stopped LeaderInitiator for " + getContext ());
311+ public synchronized void stop () {
312+ if (this .running ) {
313+ this .running = false ;
314+ if (this .future != null ) {
315+ this .future .cancel (true );
326316 }
317+ this .future = null ;
318+ LOGGER .debug (() -> "Stopped LeaderInitiator for " + getContext ());
327319 }
328320 }
329321
@@ -382,9 +374,9 @@ public Void call() {
382374 try {
383375 this .lock .unlock ();
384376 }
385- catch (Exception e ) {
386- LOGGER .debug ("Could not unlock during stop for " + this . context
387- + " - treat as broken. Revoking..." , e );
377+ catch (Exception ex ) {
378+ LOGGER .debug (ex , () ->
379+ "Could not unlock during stop for " + this . context + " - treat as broken. Revoking..." );
388380 }
389381 // We are stopping, therefore not leading anymore
390382 handleRevoked ();
@@ -394,9 +386,7 @@ public Void call() {
394386 }
395387
396388 private void tryAcquireLock () throws InterruptedException {
397- if (LOGGER .isDebugEnabled ()) {
398- LOGGER .debug ("Acquiring the lock for " + this .context );
399- }
389+ LOGGER .debug (() -> "Acquiring the lock for " + this .context );
400390 // We always try to acquire the lock, in case it expired
401391 boolean acquired =
402392 this .lock .tryLock (LockRegistryLeaderInitiator .this .heartBeatMillis , TimeUnit .MILLISECONDS );
@@ -436,8 +426,8 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
436426 this .lock .unlock ();
437427 }
438428 catch (Exception e1 ) {
439- LOGGER .debug ("Could not unlock - treat as broken " + this .context +
440- ". Revoking " + (isRunning () ? " and retrying..." : "..." ), e1 );
429+ LOGGER .debug (e1 , () -> "Could not unlock - treat as broken " + this .context +
430+ ". Revoking " + (isRunning () ? " and retrying..." : "..." ));
441431
442432 }
443433 // The lock was broken and we are no longer leader
@@ -462,16 +452,14 @@ private boolean unlockAndHandleException(Exception ex) { // NOSONAR
462452 Thread .currentThread ().interrupt ();
463453 }
464454 }
465- if (LOGGER .isDebugEnabled ()) {
466- LOGGER .debug ("Error acquiring the lock for " + this .context +
467- ". " + (isRunning () ? "Retrying..." : "" ), ex );
468- }
455+ LOGGER .debug (ex , () ->
456+ "Error acquiring the lock for " + this .context + ". " + (isRunning () ? "Retrying..." : "" ));
469457 }
470458 return false ;
471459 }
472460
473461 private void restartSelectorBecauseOfError (Exception ex ) {
474- LOGGER .warn ("Restarting LeaderSelector for " + this .context + " because of error." , ex );
462+ LOGGER .warn (ex , () -> "Restarting LeaderSelector for " + this .context + " because of error." );
475463 LockRegistryLeaderInitiator .this .future =
476464 LockRegistryLeaderInitiator .this .executorService .submit (
477465 () -> {
@@ -492,8 +480,8 @@ private void handleGranted() throws InterruptedException {
492480 LockRegistryLeaderInitiator .this .leaderEventPublisher .publishOnGranted (
493481 LockRegistryLeaderInitiator .this , this .context , this .lockKey );
494482 }
495- catch (Exception e ) {
496- LOGGER .warn ("Error publishing OnGranted event." , e );
483+ catch (Exception ex ) {
484+ LOGGER .warn (ex , "Error publishing OnGranted event." );
497485 }
498486 }
499487 }
@@ -506,8 +494,8 @@ private void handleRevoked() {
506494 LockRegistryLeaderInitiator .this , this .context ,
507495 LockRegistryLeaderInitiator .this .candidate .getRole ());
508496 }
509- catch (Exception e ) {
510- LOGGER .warn ("Error publishing OnRevoked event." , e );
497+ catch (Exception ex ) {
498+ LOGGER .warn (ex , "Error publishing OnRevoked event." );
511499 }
512500 }
513501 }
@@ -520,8 +508,8 @@ private void publishFailedToAcquire() {
520508 this .context ,
521509 LockRegistryLeaderInitiator .this .candidate .getRole ());
522510 }
523- catch (Exception e ) {
524- LOGGER .warn ("Error publishing OnFailedToAcquire event." , e );
511+ catch (Exception ex ) {
512+ LOGGER .warn (ex , "Error publishing OnFailedToAcquire event." );
525513 }
526514 }
527515 }
@@ -543,9 +531,7 @@ public boolean isLeader() {
543531
544532 @ Override
545533 public void yield () {
546- if (LOGGER .isDebugEnabled ()) {
547- LOGGER .debug ("Yielding leadership from " + this );
548- }
534+ LOGGER .debug (() -> "Yielding leadership from " + this );
549535 LockRegistryLeaderInitiator .this .leaderSelector .yielding = true ;
550536 }
551537
0 commit comments