From b360a24437ee3010b9c8402460a7ca663ae8f8e5 Mon Sep 17 00:00:00 2001 From: trydofor Date: Mon, 5 Aug 2024 19:54:40 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20executor=20shutdown=20quic?= =?UTF-8?q?kly=20and=20now=20#283?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- observe/docs | 2 +- .../service/impl/TinyMailServiceImpl.java | 35 ++- .../wings-tinymail-service-79.properties | 4 +- .../service/impl/TinyTaskExecServiceImpl.java | 81 ++++--- .../service/impl/TinyTaskServiceImpl.java | 2 +- .../tiny/task/other/ExecutorServiceTest.java | 60 +++-- .../slardar/async/TaskSchedulerHelper.java | 207 ++++++++++++++++-- 7 files changed, 307 insertions(+), 84 deletions(-) diff --git a/observe/docs b/observe/docs index b655b98bf..a60a40517 160000 --- a/observe/docs +++ b/observe/docs @@ -1 +1 @@ -Subproject commit b655b98bfadf5279196e39ed38f494ffe2e0bba0 +Subproject commit a60a40517f41a3414eae2fa42f53d1f6a3a895fe diff --git a/radiant/tiny-mail/src/main/java/pro/fessional/wings/tiny/mail/service/impl/TinyMailServiceImpl.java b/radiant/tiny-mail/src/main/java/pro/fessional/wings/tiny/mail/service/impl/TinyMailServiceImpl.java index 90b3d4211..1151798e6 100644 --- a/radiant/tiny-mail/src/main/java/pro/fessional/wings/tiny/mail/service/impl/TinyMailServiceImpl.java +++ b/radiant/tiny-mail/src/main/java/pro/fessional/wings/tiny/mail/service/impl/TinyMailServiceImpl.java @@ -8,13 +8,14 @@ import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.task.TaskSchedulingProperties; import org.springframework.boot.task.ThreadPoolTaskSchedulerBuilder; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.EventListener; import org.springframework.core.io.Resource; import org.springframework.core.io.ResourceLoader; import org.springframework.mail.MailParseException; @@ -84,7 +85,7 @@ @Service @ConditionalWingsEnabled @Slf4j -public class TinyMailServiceImpl implements TinyMailService, InitializingBean, DisposableBean { +public class TinyMailServiceImpl implements TinyMailService, InitializingBean { @Setter(onMethod_ = { @Value("${spring.application.name}") }) protected String appName; @@ -228,12 +229,30 @@ public void afterPropertiesSet() { }); } - @Override + @EventListener(ContextClosedEvent.class) public void destroy() { isShutdown = true; + if (taskScheduler != null) { taskScheduler.shutdown(); } + + synchronized (asyncMailSched) { + int size = asyncMailSched.size(); + for (ScheduledFuture task : asyncMailSched.values()) { + task.cancel(false); + } + if (size > 0) { + log.info("cancel async mail for shutdown, size={}", size); + } + asyncMailSched.clear(); + } + + ScheduledFuture task = idleScanTask.get(); + if (task != null) { + task.cancel(false); + log.info("cancel async scan mail for shutdown"); + } } @Override @@ -1052,19 +1071,19 @@ private void sendAsyncMail() { } } - private final AtomicInteger trimSchedConter = new AtomicInteger(0); + private final AtomicInteger trimSchedSeq = new AtomicInteger(0); @Setter @Getter private int maxAsyncSched = 1; private void trimAsyncMailSched() { - synchronized (trimSchedConter) { - if (trimSchedConter.get() > maxAsyncSched) return; - trimSchedConter.incrementAndGet(); + synchronized (trimSchedSeq) { + if (trimSchedSeq.get() > maxAsyncSched) return; + trimSchedSeq.incrementAndGet(); } taskScheduler.schedule(() -> { - trimSchedConter.decrementAndGet(); + trimSchedSeq.decrementAndGet(); final int ts; synchronized (asyncMailSched) { diff --git a/radiant/tiny-mail/src/main/resources/wings-conf/wings-tinymail-service-79.properties b/radiant/tiny-mail/src/main/resources/wings-conf/wings-tinymail-service-79.properties index 872510e71..61797385c 100644 --- a/radiant/tiny-mail/src/main/resources/wings-conf/wings-tinymail-service-79.properties +++ b/radiant/tiny-mail/src/main/resources/wings-conf/wings-tinymail-service-79.properties @@ -21,6 +21,6 @@ wings.tiny.mail.service.only-run=true ## TaskSchedulingProperties wings.tiny.mail.service.scheduler.pool.size=2 -wings.tiny.mail.service.scheduler.shutdown.await-termination=true -wings.tiny.mail.service.scheduler.shutdown.await-termination-period=60s +wings.tiny.mail.service.scheduler.shutdown.await-termination=false +#wings.tiny.mail.service.scheduler.shutdown.await-termination-period=15s wings.tiny.mail.service.scheduler.thread-name-prefix=mail- diff --git a/radiant/tiny-task/src/main/java/pro/fessional/wings/tiny/task/service/impl/TinyTaskExecServiceImpl.java b/radiant/tiny-task/src/main/java/pro/fessional/wings/tiny/task/service/impl/TinyTaskExecServiceImpl.java index bebb7d792..61c73a7e6 100644 --- a/radiant/tiny-task/src/main/java/pro/fessional/wings/tiny/task/service/impl/TinyTaskExecServiceImpl.java +++ b/radiant/tiny-task/src/main/java/pro/fessional/wings/tiny/task/service/impl/TinyTaskExecServiceImpl.java @@ -4,10 +4,11 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.jooq.Field; -import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.PeriodicTrigger; @@ -15,6 +16,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Propagation; import pro.fessional.mirana.cast.BoxedCastUtil; +import pro.fessional.mirana.data.U; import pro.fessional.mirana.lock.JvmStaticGlobalLock; import pro.fessional.mirana.pain.ThrowableUtil; import pro.fessional.mirana.stat.JvmStat; @@ -72,9 +74,9 @@ @Service @ConditionalWingsEnabled @Slf4j -public class TinyTaskExecServiceImpl implements TinyTaskExecService, InitializingBean, DisposableBean { +public class TinyTaskExecServiceImpl implements TinyTaskExecService, InitializingBean { - protected static final ConcurrentHashMap> Handle = new ConcurrentHashMap<>(); + protected static final ConcurrentHashMap, Long, String>> Handle = new ConcurrentHashMap<>(); protected static final ConcurrentHashMap Cancel = new ConcurrentHashMap<>(); protected static final ConcurrentHashMap Booted = new ConcurrentHashMap<>(); protected static final ConcurrentHashMap Untune = new ConcurrentHashMap<>(); @@ -99,23 +101,27 @@ public class TinyTaskExecServiceImpl implements TinyTaskExecService, Initializin @Setter(onMethod_ = { @Autowired }) protected TinyTaskExecProp execProp; - protected volatile boolean isShutdown = false; + protected volatile boolean isShutdownFast = false; + protected volatile boolean isShutdownSlow = false; @Override public void afterPropertiesSet() { - isShutdown = false; + isShutdownFast = false; + isShutdownSlow = false; } - @Override + @EventListener(ContextClosedEvent.class) public void destroy() { - isShutdown = true; + log.info("tiny-task shutdown for ContextClosedEvent"); + isShutdownFast = true; + isShutdownSlow = true; for (var en : Handle.entrySet()) { - var task = en.getValue(); - if (task.isDone()) continue; - - log.info("try to cancal tiny-task for shutdown, id={}", en.getKey()); + var tvl = en.getValue(); + var task = tvl.one(); + log.info("try to cancal tiny-task for shutdown, id={}, key={}, next_sys={}", en.getKey(), tvl.three(), DateLocaling.sysLdt(tvl.two())); task.cancel(false); } + Handle.clear(); } @Override @@ -126,19 +132,31 @@ public boolean launch(long id) { @Override public boolean force(long id) { - if (isShutdown) { - log.warn("skip tiny-task for shutdwon, force id={}", id); + if (isShutdownFast && isShutdownSlow) { + log.warn("cancel tiny-task for shutdown all on force, id={}", id); return false; } final WinTaskDefine td = winTaskDefineDao.fetchOneById(id); if (td == null) { - log.info("skip tiny-task for not found, id={}", id); + log.info("skip tiny-task for not found on force, id={}", id); return false; } final boolean fast = BoxedCastUtil.orTrue(td.getTaskerFast()); - final var scheduler = fast ? TaskSchedulerHelper.Fast() : TaskSchedulerHelper.Scheduled(); + if (fast && isShutdownFast || !fast && isShutdownSlow) { + log.warn("cancal tiny-task for shutdown on force, fast={}, id={}", fast, id); + return false; + } + + final var scheduler = TaskSchedulerHelper.Scheduler(fast); + if (scheduler.getScheduledExecutor().isShutdown()) { + log.warn("cancal tiny-task for Executor shutdown on force, fast={}, id={}", fast, id); + if (fast) isShutdownFast = true; + else isShutdownSlow = true; + return false; + } + scheduler.schedule(() -> { long execTms = ThreadNow.millis(); long doneTms = -1; @@ -190,22 +208,23 @@ public boolean force(long id) { } } }, Instant.ofEpochMilli(ThreadNow.millis())); + return true; } @Override public boolean cancel(long id) { Cancel.put(id, Boolean.TRUE); - final ScheduledFuture ft = Handle.get(id); - if (ft == null) { + final var tvl = Handle.get(id); + if (tvl == null) { log.info("cancel not found, id={}", id); return true; } - final boolean r = ft.cancel(false); + final boolean r = tvl.one().cancel(false); if (r) { Handle.remove(id); } - log.info("cancel success={}, id={}", r, id); + log.info("cancel success={}, id={}, key={}, next_sys={}", id, tvl.three(), DateLocaling.sysLdt(tvl.two())); return r; } @@ -220,8 +239,8 @@ public Set running() { } private boolean relaunch(long id) { - if (isShutdown) { - log.warn("skip tiny-task for shutdwon, relaunch id={}", id); + if (isShutdownFast && isShutdownSlow) { + log.warn("cancel tiny-task for shutdown all on relaunch, id={}", id); return false; } @@ -229,13 +248,13 @@ private boolean relaunch(long id) { try { lock.lock(); if (Handle.containsKey(id)) { - log.info("skip tiny-task for launching, id={}", id); + log.info("skip tiny-task on launching, id={}", id); return false; } final WinTaskDefine td = winTaskDefineDao.fetchOneById(id); if (td == null) { - log.info("skip tiny-task for not found, relaunch id={}", id); + log.info("skip tiny-task for not found on relaunch id={}", id); return false; } @@ -253,15 +272,21 @@ private boolean relaunch(long id) { saveNextExec(next, td); final boolean fast = BoxedCastUtil.orTrue(td.getTaskerFast()); - final var taskScheduler = fast ? TaskSchedulerHelper.Fast() : TaskSchedulerHelper.Scheduled(); + if (fast && isShutdownFast || !fast && isShutdownSlow) { + log.warn("cancal tiny-task for shutdown on relaunch, fast={}, id={}", fast, id); + return false; + } - if (taskScheduler.getScheduledExecutor().isShutdown()) { - log.error("TaskScheduler={} is shutdown, id={}, prop={}", fast, id, key); + final var scheduler = TaskSchedulerHelper.Scheduler(fast); + if (scheduler.getScheduledExecutor().isShutdown()) { + log.warn("cancal tiny-task for Executor shutdown on relaunch, fast={}, id={}, prop={}", fast, id, key); + if (fast) isShutdownFast = true; + else isShutdownSlow = true; return false; } log.info("prepare tiny-task id={}, prop={}", id, key); - final ScheduledFuture handle = taskScheduler.schedule(() -> { + final ScheduledFuture handle = scheduler.schedule(() -> { long execTms = ThreadNow.millis(); try { if (notNextLock(td, execTms)) { @@ -333,7 +358,7 @@ private boolean relaunch(long id) { }, Instant.ofEpochMilli(next)); // - Handle.put(id, handle); + Handle.put(id, U.of(handle, next, key)); return true; } finally { diff --git a/radiant/tiny-task/src/main/java/pro/fessional/wings/tiny/task/service/impl/TinyTaskServiceImpl.java b/radiant/tiny-task/src/main/java/pro/fessional/wings/tiny/task/service/impl/TinyTaskServiceImpl.java index 8cf11013f..23152963a 100644 --- a/radiant/tiny-task/src/main/java/pro/fessional/wings/tiny/task/service/impl/TinyTaskServiceImpl.java +++ b/radiant/tiny-task/src/main/java/pro/fessional/wings/tiny/task/service/impl/TinyTaskServiceImpl.java @@ -36,7 +36,7 @@ public class TinyTaskServiceImpl implements TinyTaskService { @Override @NotNull public ThreadPoolTaskScheduler referScheduler(boolean fast) { - return fast ? TaskSchedulerHelper.Fast() : TaskSchedulerHelper.Scheduled(); + return TaskSchedulerHelper.Scheduler(fast); } @Override diff --git a/radiant/tiny-task/src/test/java/pro/fessional/wings/tiny/task/other/ExecutorServiceTest.java b/radiant/tiny-task/src/test/java/pro/fessional/wings/tiny/task/other/ExecutorServiceTest.java index 9110285cd..2d1159140 100644 --- a/radiant/tiny-task/src/test/java/pro/fessional/wings/tiny/task/other/ExecutorServiceTest.java +++ b/radiant/tiny-task/src/test/java/pro/fessional/wings/tiny/task/other/ExecutorServiceTest.java @@ -2,14 +2,13 @@ import io.qameta.allure.TmsLink; import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import pro.fessional.mirana.time.Sleep; import pro.fessional.wings.slardar.async.TaskSchedulerHelper; import pro.fessional.wings.testing.silencer.TestingLoggerAssert; -import java.time.Instant; import java.util.concurrent.ScheduledFuture; /** @@ -17,8 +16,8 @@ * @since 2022-12-13 */ @SpringBootTest(properties = { - "logging.level.root=DEBUG", // AssertionLogger - "spring.task.scheduling.shutdown.await-termination-period=2s", + "logging.level.root=DEBUG", // AssertionLogger + "spring.task.scheduling.shutdown.await-termination-period=2s", }) @Slf4j class ExecutorServiceTest { @@ -38,54 +37,53 @@ void cancelSchedule() { al.rule(".. cancel", event -> event.getFormattedMessage().contains(".. cancel=true")); al.start(); - final ThreadPoolTaskScheduler scheduler = TaskSchedulerHelper.Scheduled(); - final ScheduledFuture f1 = scheduler.schedule(() -> log.info("-1 run={}", System.currentTimeMillis()), - Instant.ofEpochMilli(System.currentTimeMillis() - 1000)); + final ScheduledFuture f1 = TaskSchedulerHelper.Scheduled(-1000L, () -> log.info("-1 run={}", System.currentTimeMillis())); Sleep.ignoreInterrupt(500); log.info("-1 cancel={}", f1.cancel(false)); - final ScheduledFuture f2 = scheduler.schedule(() -> log.info("=0 run={}", System.currentTimeMillis()), - Instant.ofEpochMilli(System.currentTimeMillis())); + final ScheduledFuture f2 = TaskSchedulerHelper.Scheduled(0L, () -> log.info("=0 run={}", System.currentTimeMillis())); Sleep.ignoreInterrupt(500); log.info("=0 cancel={}", f2.cancel(false)); - final ScheduledFuture f3 = scheduler.schedule(() -> log.info("+1 run={}", System.currentTimeMillis()), - Instant.ofEpochMilli(System.currentTimeMillis() + 1000)); + final ScheduledFuture f3 = TaskSchedulerHelper.Scheduled(1000L, () -> log.info("+1 run={}", System.currentTimeMillis())); Sleep.ignoreInterrupt(500); log.info("+1 cancel={}", f3.cancel(false)); - final ScheduledFuture f4 = scheduler.schedule(() -> { - for (int i = 0; i < 10; i++) { - log.info("== run={}", i); - Sleep.ignoreInterrupt(100); - } - }, - Instant.ofEpochMilli(System.currentTimeMillis())); + final ScheduledFuture f4 = TaskSchedulerHelper.Scheduled(0L, () -> { + for (int i = 0; i < 10; i++) { + log.info("== run={}", i); + Sleep.ignoreInterrupt(100); + } + }); Sleep.ignoreInterrupt(500); log.info("== cancel={}", f4.cancel(false)); - final ScheduledFuture f5 = scheduler.schedule(() -> { - for (int i = 0; i < 10; i++) { - log.info(".. run={}", i); - try { - Thread.sleep(100); - } - catch (InterruptedException e) { - break; - } - } - }, - Instant.ofEpochMilli(System.currentTimeMillis())); + final ScheduledFuture f5 = TaskSchedulerHelper.Scheduled(0L, () -> { + for (int i = 0; i < 10; i++) { + log.info(".. run={}", i); + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + break; + } + } + }); + + int size = TaskSchedulerHelper.runningSize(); + Assertions.assertTrue(size >= 1, "size=" + size); + Sleep.ignoreInterrupt(500); log.info(".. cancel={}", f5.cancel(true)); Sleep.ignoreInterrupt(2000); log.info("== done="); - al.stop(); al.assertCount(1); al.uninstall(); + + Assertions.assertEquals(0, TaskSchedulerHelper.runningSize()); } } diff --git a/wings/slardar/src/main/java/pro/fessional/wings/slardar/async/TaskSchedulerHelper.java b/wings/slardar/src/main/java/pro/fessional/wings/slardar/async/TaskSchedulerHelper.java index cea75834b..2335ca7a6 100644 --- a/wings/slardar/src/main/java/pro/fessional/wings/slardar/async/TaskSchedulerHelper.java +++ b/wings/slardar/src/main/java/pro/fessional/wings/slardar/async/TaskSchedulerHelper.java @@ -1,6 +1,7 @@ package pro.fessional.wings.slardar.async; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.springframework.beans.factory.DisposableBean; import org.springframework.boot.task.ThreadPoolTaskSchedulerBuilder; import org.springframework.scheduling.Trigger; @@ -9,7 +10,12 @@ import java.time.Instant; import java.util.Objects; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Delayed; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * @author trydofor @@ -17,6 +23,43 @@ */ public class TaskSchedulerHelper implements DisposableBean { + public static final ScheduledFuture NoFuture = new ScheduledFuture<>() { + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return true; + } + + @Override + public boolean isCancelled() { + return true; + } + + @Override + public boolean isDone() { + return true; + } + + @Override + public Object get() { + throw new CancellationException("No-op"); + } + + @Override + public Object get(long timeout, TimeUnit unit) { + throw new CancellationException("No-op"); + } + + @Override + public long getDelay(TimeUnit unit) { + return 0; + } + + @Override + public int compareTo(Delayed other) { + return 0; + } + }; + private static ThreadPoolTaskScheduler FastScheduler; private static ThreadPoolTaskScheduler ScheduledScheduler; private static ThreadPoolTaskSchedulerBuilder FastBuilder; @@ -36,6 +79,10 @@ protected TaskSchedulerHelper(@NotNull ThreadPoolTaskScheduler fast, @NotNull Th @Override public void destroy() { helperPrepared = false; + for (var task : TaskFuture.values()) { + task.cancel(false); + } + TaskFuture.clear(); } /** @@ -52,9 +99,11 @@ public static TtlThreadPoolTaskScheduler Ttl(ThreadPoolTaskSchedulerBuilder buil return builder.configure(new TtlThreadPoolTaskScheduler()); } - /** - * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME - */ + @NotNull + public static ThreadPoolTaskScheduler Scheduler(boolean fast) { + return fast ? Fast() : Scheduled(); + } + @NotNull public static ThreadPoolTaskScheduler Fast() { if (FastScheduler == null) { @@ -64,7 +113,7 @@ public static ThreadPoolTaskScheduler Fast() { } /** - * see NamingSlardarConst#slardarHeavyScheduler + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME */ @NotNull public static ThreadPoolTaskScheduler Scheduled() { @@ -78,7 +127,7 @@ public static ThreadPoolTaskScheduler Scheduled() { /** * just like default @Scheduled * - * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME */ public static void Scheduled(@NotNull Runnable task) { Scheduled().execute(task); @@ -87,28 +136,160 @@ public static void Scheduled(@NotNull Runnable task) { /** * just like default @Scheduled * - * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Instant) */ + @NotNull public static ScheduledFuture Scheduled(long delayMs, @NotNull Runnable task) { - return Scheduled().schedule(task, Instant.ofEpochMilli(ThreadNow.millis() + delayMs)); + return Scheduled(Scheduled(), delayMs, task); + } + + /** + * just like default @Scheduled + * + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Instant) + */ + @NotNull + public static ScheduledFuture Scheduled(@NotNull Instant start, @NotNull Runnable task) { + return Scheduled(Scheduled(), start, task); + } + + /** + * just like default @Scheduled + * + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Trigger) + */ + @Nullable + public static ScheduledFuture Scheduled(@NotNull Trigger trigger, @NotNull Runnable task) { + return Scheduled(Scheduled(), trigger, task); + } + + + /** + * just like default @Scheduled + * + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + */ + public static void Scheduled(boolean fast, @NotNull Runnable task) { + Scheduler(fast).execute(task); + } + + /** + * just like default @Scheduled + * + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Instant) + */ + @NotNull + public static ScheduledFuture Scheduled(boolean fast, long delayMs, @NotNull Runnable task) { + return Scheduled(Scheduler(fast), delayMs, task); } /** * just like default @Scheduled * - * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Instant) */ - public static ScheduledFuture Scheduled(Instant start, @NotNull Runnable task) { - return Scheduled().schedule(task, start); + @NotNull + public static ScheduledFuture Scheduled(boolean fast, @NotNull Instant start, @NotNull Runnable task) { + return Scheduled(Scheduler(fast), start, task); } /** * just like default @Scheduled * - * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Trigger) + */ + @Nullable + public static ScheduledFuture Scheduled(boolean fast, @NotNull Trigger trigger, @NotNull Runnable task) { + return Scheduled(Scheduler(fast), trigger, task); + } + + /** + * just like default @Scheduled + * + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Instant) + */ + @NotNull + public static ScheduledFuture Scheduled(@NotNull ThreadPoolTaskScheduler scheduler, long delayMs, @NotNull Runnable task) { + return Scheduled(scheduler, Instant.ofEpochMilli(ThreadNow.millis() + delayMs), task); + } + + /** + * just like default @Scheduled + * + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Instant) */ - public static ScheduledFuture Scheduled(Trigger trigger, @NotNull Runnable task) { - return Scheduled().schedule(task, trigger); + @NotNull + public static ScheduledFuture Scheduled(@NotNull ThreadPoolTaskScheduler scheduler, @NotNull Instant start, @NotNull Runnable task) { + final Long seq = TaskSeq.incrementAndGet(); + TaskFuture.put(seq, NoFuture); + final var future = scheduler.schedule(new Task(seq, task), start); + if (future.isDone()) { + TaskFuture.remove(seq); + } + else { + TaskFuture.computeIfPresent(seq, (k, v) -> future); + } + return future; + } + + /** + * just like default @Scheduled + * + * @see org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor#DEFAULT_TASK_SCHEDULER_BEAN_NAME + * @see ThreadPoolTaskScheduler#schedule(Runnable, Trigger) + */ + @Nullable + public static ScheduledFuture Scheduled(@NotNull ThreadPoolTaskScheduler scheduler, @NotNull Trigger trigger, @NotNull Runnable task) { + final Long seq = TaskSeq.incrementAndGet(); + TaskFuture.put(seq, NoFuture); + final var future = scheduler.schedule(new Task(seq, task), trigger); + if (future == null || future.isDone()) { + TaskFuture.remove(seq); + } + else { + TaskFuture.computeIfPresent(seq, (k, v) -> future); + } + return future; + } + + /** + * clean done task and get the running size + */ + public static int runningSize() { + TaskFuture.entrySet().removeIf(en -> en.getValue().isDone()); + return TaskFuture.size(); + } + + private static final ConcurrentHashMap> TaskFuture = new ConcurrentHashMap<>(); + private static final AtomicLong TaskSeq = new AtomicLong(0); + + + private static class Task implements Runnable { + private final Long seq; + private final Runnable runnable; + + public Task(Long seq, Runnable runnable) { + this.seq = seq; + this.runnable = runnable; + } + + @Override + public void run() { + try { + runnable.run(); + } + finally { + TaskFuture.remove(seq); + } + } } /**