Skip to content

Commit

Permalink
♻️ executor shutdown quickly and now #283
Browse files Browse the repository at this point in the history
  • Loading branch information
trydofor committed Aug 6, 2024
1 parent 9e63d82 commit b360a24
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 84 deletions.
2 changes: 1 addition & 1 deletion observe/docs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.task.TaskSchedulingProperties;
import org.springframework.boot.task.ThreadPoolTaskSchedulerBuilder;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.mail.MailParseException;
Expand Down Expand Up @@ -84,7 +85,7 @@
@Service
@ConditionalWingsEnabled
@Slf4j
public class TinyMailServiceImpl implements TinyMailService, InitializingBean, DisposableBean {
public class TinyMailServiceImpl implements TinyMailService, InitializingBean {

@Setter(onMethod_ = { @Value("${spring.application.name}") })
protected String appName;
Expand Down Expand Up @@ -228,12 +229,30 @@ public void afterPropertiesSet() {
});
}

@Override
@EventListener(ContextClosedEvent.class)
public void destroy() {
isShutdown = true;

if (taskScheduler != null) {
taskScheduler.shutdown();
}

synchronized (asyncMailSched) {
int size = asyncMailSched.size();
for (ScheduledFuture<?> task : asyncMailSched.values()) {
task.cancel(false);
}
if (size > 0) {
log.info("cancel async mail for shutdown, size={}", size);
}
asyncMailSched.clear();
}

ScheduledFuture<?> task = idleScanTask.get();
if (task != null) {
task.cancel(false);
log.info("cancel async scan mail for shutdown");
}
}

@Override
Expand Down Expand Up @@ -1052,19 +1071,19 @@ private void sendAsyncMail() {
}
}

private final AtomicInteger trimSchedConter = new AtomicInteger(0);
private final AtomicInteger trimSchedSeq = new AtomicInteger(0);
@Setter
@Getter
private int maxAsyncSched = 1;

private void trimAsyncMailSched() {
synchronized (trimSchedConter) {
if (trimSchedConter.get() > maxAsyncSched) return;
trimSchedConter.incrementAndGet();
synchronized (trimSchedSeq) {
if (trimSchedSeq.get() > maxAsyncSched) return;
trimSchedSeq.incrementAndGet();
}

taskScheduler.schedule(() -> {
trimSchedConter.decrementAndGet();
trimSchedSeq.decrementAndGet();

final int ts;
synchronized (asyncMailSched) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ wings.tiny.mail.service.only-run=true

## TaskSchedulingProperties
wings.tiny.mail.service.scheduler.pool.size=2
wings.tiny.mail.service.scheduler.shutdown.await-termination=true
wings.tiny.mail.service.scheduler.shutdown.await-termination-period=60s
wings.tiny.mail.service.scheduler.shutdown.await-termination=false
#wings.tiny.mail.service.scheduler.shutdown.await-termination-period=15s
wings.tiny.mail.service.scheduler.thread-name-prefix=mail-
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jooq.Field;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.scheduling.support.SimpleTriggerContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import pro.fessional.mirana.cast.BoxedCastUtil;
import pro.fessional.mirana.data.U;
import pro.fessional.mirana.lock.JvmStaticGlobalLock;
import pro.fessional.mirana.pain.ThrowableUtil;
import pro.fessional.mirana.stat.JvmStat;
Expand Down Expand Up @@ -72,9 +74,9 @@
@Service
@ConditionalWingsEnabled
@Slf4j
public class TinyTaskExecServiceImpl implements TinyTaskExecService, InitializingBean, DisposableBean {
public class TinyTaskExecServiceImpl implements TinyTaskExecService, InitializingBean {

protected static final ConcurrentHashMap<Long, ScheduledFuture<?>> Handle = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<Long, U.Three<ScheduledFuture<?>, Long, String>> Handle = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<Long, Boolean> Cancel = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<Long, Integer> Booted = new ConcurrentHashMap<>();
protected static final ConcurrentHashMap<Long, Boolean> Untune = new ConcurrentHashMap<>();
Expand All @@ -99,23 +101,27 @@ public class TinyTaskExecServiceImpl implements TinyTaskExecService, Initializin
@Setter(onMethod_ = { @Autowired })
protected TinyTaskExecProp execProp;

protected volatile boolean isShutdown = false;
protected volatile boolean isShutdownFast = false;
protected volatile boolean isShutdownSlow = false;

@Override
public void afterPropertiesSet() {
isShutdown = false;
isShutdownFast = false;
isShutdownSlow = false;
}

@Override
@EventListener(ContextClosedEvent.class)
public void destroy() {
isShutdown = true;
log.info("tiny-task shutdown for ContextClosedEvent");
isShutdownFast = true;
isShutdownSlow = true;
for (var en : Handle.entrySet()) {
var task = en.getValue();
if (task.isDone()) continue;

log.info("try to cancal tiny-task for shutdown, id={}", en.getKey());
var tvl = en.getValue();
var task = tvl.one();
log.info("try to cancal tiny-task for shutdown, id={}, key={}, next_sys={}", en.getKey(), tvl.three(), DateLocaling.sysLdt(tvl.two()));
task.cancel(false);
}
Handle.clear();
}

@Override
Expand All @@ -126,19 +132,31 @@ public boolean launch(long id) {

@Override
public boolean force(long id) {
if (isShutdown) {
log.warn("skip tiny-task for shutdwon, force id={}", id);
if (isShutdownFast && isShutdownSlow) {
log.warn("cancel tiny-task for shutdown all on force, id={}", id);
return false;
}

final WinTaskDefine td = winTaskDefineDao.fetchOneById(id);
if (td == null) {
log.info("skip tiny-task for not found, id={}", id);
log.info("skip tiny-task for not found on force, id={}", id);
return false;
}

final boolean fast = BoxedCastUtil.orTrue(td.getTaskerFast());
final var scheduler = fast ? TaskSchedulerHelper.Fast() : TaskSchedulerHelper.Scheduled();
if (fast && isShutdownFast || !fast && isShutdownSlow) {
log.warn("cancal tiny-task for shutdown on force, fast={}, id={}", fast, id);
return false;
}

final var scheduler = TaskSchedulerHelper.Scheduler(fast);
if (scheduler.getScheduledExecutor().isShutdown()) {
log.warn("cancal tiny-task for Executor shutdown on force, fast={}, id={}", fast, id);
if (fast) isShutdownFast = true;
else isShutdownSlow = true;
return false;
}

scheduler.schedule(() -> {
long execTms = ThreadNow.millis();
long doneTms = -1;
Expand Down Expand Up @@ -190,22 +208,23 @@ public boolean force(long id) {
}
}
}, Instant.ofEpochMilli(ThreadNow.millis()));

return true;
}

@Override
public boolean cancel(long id) {
Cancel.put(id, Boolean.TRUE);
final ScheduledFuture<?> ft = Handle.get(id);
if (ft == null) {
final var tvl = Handle.get(id);
if (tvl == null) {
log.info("cancel not found, id={}", id);
return true;
}
final boolean r = ft.cancel(false);
final boolean r = tvl.one().cancel(false);
if (r) {
Handle.remove(id);
}
log.info("cancel success={}, id={}", r, id);
log.info("cancel success={}, id={}, key={}, next_sys={}", id, tvl.three(), DateLocaling.sysLdt(tvl.two()));
return r;
}

Expand All @@ -220,22 +239,22 @@ public Set<Long> running() {
}

private boolean relaunch(long id) {
if (isShutdown) {
log.warn("skip tiny-task for shutdwon, relaunch id={}", id);
if (isShutdownFast && isShutdownSlow) {
log.warn("cancel tiny-task for shutdown all on relaunch, id={}", id);
return false;
}

final Lock lock = JvmStaticGlobalLock.get(id);
try {
lock.lock();
if (Handle.containsKey(id)) {
log.info("skip tiny-task for launching, id={}", id);
log.info("skip tiny-task on launching, id={}", id);
return false;
}

final WinTaskDefine td = winTaskDefineDao.fetchOneById(id);
if (td == null) {
log.info("skip tiny-task for not found, relaunch id={}", id);
log.info("skip tiny-task for not found on relaunch id={}", id);
return false;
}

Expand All @@ -253,15 +272,21 @@ private boolean relaunch(long id) {
saveNextExec(next, td);

final boolean fast = BoxedCastUtil.orTrue(td.getTaskerFast());
final var taskScheduler = fast ? TaskSchedulerHelper.Fast() : TaskSchedulerHelper.Scheduled();
if (fast && isShutdownFast || !fast && isShutdownSlow) {
log.warn("cancal tiny-task for shutdown on relaunch, fast={}, id={}", fast, id);
return false;
}

if (taskScheduler.getScheduledExecutor().isShutdown()) {
log.error("TaskScheduler={} is shutdown, id={}, prop={}", fast, id, key);
final var scheduler = TaskSchedulerHelper.Scheduler(fast);
if (scheduler.getScheduledExecutor().isShutdown()) {
log.warn("cancal tiny-task for Executor shutdown on relaunch, fast={}, id={}, prop={}", fast, id, key);
if (fast) isShutdownFast = true;
else isShutdownSlow = true;
return false;
}

log.info("prepare tiny-task id={}, prop={}", id, key);
final ScheduledFuture<?> handle = taskScheduler.schedule(() -> {
final ScheduledFuture<?> handle = scheduler.schedule(() -> {
long execTms = ThreadNow.millis();
try {
if (notNextLock(td, execTms)) {
Expand Down Expand Up @@ -333,7 +358,7 @@ private boolean relaunch(long id) {
}, Instant.ofEpochMilli(next));

//
Handle.put(id, handle);
Handle.put(id, U.of(handle, next, key));
return true;
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class TinyTaskServiceImpl implements TinyTaskService {
@Override
@NotNull
public ThreadPoolTaskScheduler referScheduler(boolean fast) {
return fast ? TaskSchedulerHelper.Fast() : TaskSchedulerHelper.Scheduled();
return TaskSchedulerHelper.Scheduler(fast);
}

@Override
Expand Down
Loading

0 comments on commit b360a24

Please sign in to comment.